Rubyのマルチスレッド化


システム上で実行されているすべてのプログラムはプロセスです。各プロセスには 1 つ以上のスレッドが含まれます。

スレッドは、プログラム内の単一の順次制御プロセスであり、複数のスレッドを同時に実行して、単一のプログラム内で異なるタスクを完了することをマルチスレッドと呼びます。

Ruby では、Thread クラスを通じてマルチスレッドを作成できます。Ruby のスレッドは軽量であり、効率的な方法で並列コードを実装できます。


Ruby スレッドを作成する

新しいスレッドを開始するには、Thread.new:

# 线程 #1 代码部分
Thread.new {
  # 线程 #2 执行代码
}
# 线程 #1 执行代码

Example

を呼び出すだけです。次の例は、Ruby プログラムでマルチスレッドを使用する方法を示しています:

#!/usr/bin/ruby

def func1
   i=0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i=i+1
   end
end

def func2
   j=0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j=j+1
   end
end

puts "Started At #{Time.now}"
t1=Thread.new{func1()}
t2=Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"


上記コードの実行結果は次のとおりです:

Started At Wed May 14 08:21:54 -0700 2014
func1 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:55 -0700 2014
func1 at: Wed May 14 08:21:56 -0700 2014
func2 at: Wed May 14 08:21:56 -0700 2014
func1 at: Wed May 14 08:21:58 -0700 2014
End at Wed May 14 08:22:00 -0700 2014

スレッド ライフ サイクル

1. Thread.new を使用してスレッドを作成することもできます。同じ構文で Thread.start または Thread.fork の 3 つのメソッドを使用することもできます。

2. スレッドを作成した後に開始する必要はなく、スレッドは自動的に実行されます。

3. Thread クラスは、スレッドを制御するためのいくつかのメソッドを定義します。スレッドは Thread.new 内のコード ブロックを実行します。

4. スレッド コード ブロックの最後のステートメントはスレッド メソッドを通じて呼び出すことができ、スレッドの実行が完了した場合はスレッドの値が返されます。スレッドの実行が完了しました。

5. Thread.current メソッドは、現在のスレッドを表すオブジェクトを返します。 Thread.main メソッドはメインスレッドを返します。

6. Thread.Join メソッドを通じてスレッドを実行します。このメソッドは、現在のスレッドが実行を完了するまでメインスレッドを一時停止します。


スレッドステータス

スレッドには5つの状態があります:

スレッドステータス戻り値
実行可能ファイルrun
SleepingSleeping
終了中止
正常終了false
異常終了が発生しましたnil

スレッドと例外

スレッドで例外が発生し、レスキューによって捕捉されなかった場合、スレッドは通常、警告なしに終了されます。ただし、Thread#join により他のスレッドがこのスレッドを待機している場合、待機中のスレッドにも同じ例外がスローされます。

begin
  t = Thread.new do
    Thread.pass    # 主线程确实在等join
    raise "unhandled exception"
  end
  t.join
rescue
  p $!  # => "unhandled exception"
end

例外によるスレッドの終了時にインタプリタが中断できるようにするには、次の 3 つのメソッドを使用します。

  • スクリプトの起動時に-dオ​​プションを指定し、デバッグモードで実行します。

  • Thread.abort_on_Exception でフラグを設定します。 Thread.abort_on_exception设置标志。

  • 使用Thread#abort_on_exception

Thread#abort_on_Exception を使用して、指定したスレッドにフラグを設定します。


上記の 3 つの方法のいずれかを使用すると、インタープリタ全体が中断されます。

t = Thread.new { ... }
t.abort_on_exception = true

スレッド同期制御

Rubyでは、同期を実現するための3つの方法が提供されています:

1. スレッド同期は、Mutexクラスを通じて実現されます

2。 3 . ConditionVariable を使用して同期制御を実装します

スレッドの同期は Mutex クラスを通じて実現されます

プログラム変数が同時に複数のスレッドで必要な場合、この変数は部分的にロックできます。ロックを使用して。 コードは次のとおりです:

#!/usr/bin/ruby

require "thread"
puts "Synchronize Thread"

@num=200
@mutex=Mutex.new

def buyTicket(num)
     @mutex.lock
          if @num>=num
               @num=@num-num
               puts "you have successfully bought #{num} tickets"
          else
               puts "sorry,no enough tickets"
          end
     @mutex.unlock
end

ticket1=Thread.new 10 do
     10.times do |value|
     ticketNum=15
     buyTicket(ticketNum)
     sleep 0.01
     end
end

ticket2=Thread.new 10 do
     10.times do |value|
     ticketNum=20
     buyTicket(ticketNum)
     sleep 0.01
     end
end

sleep 1
ticket1.join
ticket2.join

上記のコードの実行結果は次のとおりです:

Synchronize Thread
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets

lock を使用して変数をロックすることに加えて、try_lock を使用して変数をロックすることもできます。また、Mutex.synchronize を使用することもできます。特定の変数へのアクセスを同期します。

データ転送を監視する Queue クラスはスレッド同期を実装します

Queue クラスは、スレッドをサポートし、キューの末尾に同期的にアクセスできるキューを表します。異なるスレッドが同じペア クラスを使用できますが、このキュー内のデータが同期できるかどうかを心配する必要はありません。さらに、SizedQueue クラスを使用すると、キューの長さを制限できるため、非常に便利です。このキューに参加する限り、スレッド同期について心配する必要はありません。

古典的なプロデューサーとコンシューマーの問題:

#!/usr/bin/ruby

require "thread"
puts "SizedQuee Test"

queue = Queue.new

producer = Thread.new do
     10.times do |i|
          sleep rand(i) # 让线程睡眠一段时间
          queue << i
          puts "#{i} produced"
     end
end

consumer = Thread.new do
     10.times do |i|
          value = queue.pop
          sleep rand(i/2)
          puts "consumed #{value}"
     end
end

consumer.join


プログラム出力:

SizedQuee Test
0 produced
1 produced
consumed 0
2 produced
consumed 1
consumed 2
3 produced
consumed 34 produced

consumed 4
5 produced
consumed 5
6 produced
consumed 6
7 produced
consumed 7
8 produced
9 produced
consumed 8
consumed 9


スレッド変数

スレッドは独自のプライベート変数を持つことができ、スレッドのプライベート変数はスレッドの作成時にスレッドに書き込まれます。スレッドのスコープ内で使用できますが、スレッドの外で共有することはできません。

しかし、場合によっては、スレッドのローカル変数に他のスレッドまたはメインスレッドからアクセスする必要がある場合はどうすればよいでしょうか? Ruby は、スレッドをハッシュ テーブルとして扱うのと同様に、名前でスレッド変数を作成する方法を提供します。 []= を介してデータを書き込み、[] を介してデータを読み取ります。次のコードを見てみましょう:

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

上記のコードの出力は次のとおりです:

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

メインスレッドはサブスレッドの実行が完了するまで待機し、各値を個別に出力します。 。


スレッドの優先度

スレッドの優先度は、スレッドのスケジュールに影響を与える主な要素です。他の要因には、CPU が占有する実行時間の長さ、スレッドのグループ化のスケジューリングなどが含まれます。

Thread.priority メソッドを使用してスレッドの優先度を取得し、Thread.priority= メソッドを使用してスレッドの優先度を調整できます。

デフォルトでは、スレッドの優先順位は 0 です。 優先度が高いものはより速く実行されます。

スレッドは独自のスコープ内のすべてのデータにアクセスできますが、スレッド内の他のスレッドのデータにアクセスする必要がある場合はどうすればよいでしょうか? Thread クラスは、スレッド データが相互にアクセスするためのメソッドを提供します。スレッドをハッシュ テーブルとして使用するだけで、[]= を使用して任意のスレッドでデータを読み取ることができます。

athr = Thread.new { Thread.current["name"] = "Thread A"; Thread.stop }
bthr = Thread.new { Thread.current["name"] = "Thread B"; Thread.stop }
cthr = Thread.new { Thread.current["name"] = "Thread C"; Thread.stop }
Thread.list.each {|x| puts "#{x.inspect}: #{x["name"]}" }

スレッドをハッシュテーブルとして扱い、[]メソッドと[]=メソッドを使用することで、スレッド間のデータ共有を実現していることがわかります。


スレッド相互排他

Mutex (Mutal Exclusion = 相互排他ロック) は、2 つのスレッドが同じパブリック リソース (グローバル変数など) を同時に読み書きできないようにするためにマルチスレッド プログラミングで使用されるメカニズムです。

Mutaxを使用しないインスタンス

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

上記の例の出力結果は次のとおりです:

count1 :  9712487
count2 :  12501239
difference : 0

Mutaxを使用したインスタンス

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
    end
end
spy = Thread.new do
   loop do
       mutex.synchronize do
          difference += (count1 - count2).abs
       end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

上記の例の出力結果は次のとおりです:

count1 :  1336406
count2 :  1336406
difference : 0

Deadlock

2つ以上のコンピューティングユニット、両側両方の当事者がシステム リソースを取得するために相手方が実行を停止するのを待っているが、いずれの当事者も早期に終了しない場合、この状況はデッドロックと呼ばれます。

たとえば、プロセス p1 はモニターを占有しているので同時にプリンターを使用する必要があり、プリンターはプロセス p2 によって占有されており、p2 はモニターを使用する必要があるため、デッドロックが形成されます。

Mutex オブジェクトを使用するときは、スレッドのデッドロックに注意する必要があります。

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

上記の例の出力結果は次のとおりです:

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

Threadクラスメソッド

完全なThreadクラスメソッドは次のとおりです:

1314

スレッドのインスタンス化メソッド

次の例では、スレッドのインスタンス化メソッド join を呼び出します:

#!/usr/bin/ruby

thr = Thread.new do   # 实例化
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # 调用实例化方法 join

以下は、インスタンス化メソッドの完全なリストです:

シリアル番号メソッドの説明
1Thread.abort_on_Exception
その値が true の場合、スレッドが例外により終了すると、インタープリタ全体が中断されます。デフォルト値は false です。これは、通常の状況では、スレッドで例外が発生し、その例外が Thread#join などによって検出されなかった場合、スレッドは警告なしで終了されることを意味します。
2Thread.abort_on_Exception=
trueに設定すると、スレッドが例外により終了すると、インタープリタ全体が中断されます。新しいステータスを返します
3Thread.critical
ブール値を返します。
4Thread.critical=
その値が true の場合、スレッドの切り替えは発生しません。現在のスレッドがハング (停止) するか、シグナルが介入すると、その値は自動的に false に変更されます。
5Thread.current
現在実行中のスレッド(現在のスレッド)を返します。
6Thread.exit
現在のスレッドの実行を終了します。現在のスレッドを返します。現在のスレッドが唯一のスレッドである場合、その操作を終了するために exit(0) が使用されます。
7Thread.fork { block }
スレッドを生成する Thread.new と同じ。
8Thread.kill( aThread )
スレッドの実行を終了します。
9Thread.list
実行中または一時停止状態のライブスレッドの配列を返します。
10Thread.main
メインスレッドに戻ります。
11Thread.new( [ arg ]* ) {| args | block }
スレッドを生成し、実行を開始します。数値は変更されずにブロックに渡されます。これにより、スレッドの起動時にスレッド固有のローカル変数に値を渡すことができます。
12Thread.pass
は、実行中のスレッドの状態を変更しませんが、他の実行可能なスレッドに制御を与えます (明示的なスレッド スケジューリング)。
Thread.start( [ args ]* ) {| args | block } スレッドを生成し、実行を開始します。数値は変更されずにブロックに渡されます。これにより、スレッドの起動時にスレッド固有のローカル変数に値を渡すことができます。
Thread.stop は、他のスレッドが run メソッドを使用してスレッドを再びウェイクアップするまで、現在のスレッドを一時停止します。
現在のスレッドの安全レベルは $SAFE と同じです。文字列「run」、「sleep」または「aborting」を使用します。ライブスレッドのステータスを表します。スレッドが正常に終了した場合は false を返し、例外により終了した場合は nil を返します。 スレッドが終了状態(dead)またはサスペンド状態(stop)の場合、trueを返します。 自分自身になるまで待機しますスレッドが終了した後 (結合に相当)、スレッドのブロックの戻り値が返されます。スレッドの実行中に例外が発生した場合は、再度例外が発生します。 停止状態 (停止) のスレッドの状態を実行可能状態 (実行) に変更します。このメソッドがデッドスレッドで実行されると、ThreadError 例外が発生します。
シリアル番号メソッドの説明
1thr[ name ]
スレッド内のnameに対応する固有データを取得します。 name には文字列または記号を指定できます。 nameに該当するデータが無い場合はnilを返します。
2thr[ name ] =
スレッド内の名前に対応する固有データの値を設定します。名前は文字列または記号です。 nil に設定すると、スレッド内の対応するデータが削除されます。
3thr.abort_on_Exception
ブール値を返します。
4thr.abort_on_Exception=
その値が true の場合、スレッドが例外により終了すると、インタープリター全体が中断されます。
5thr.alive?
スレッドが「生きている」場合、trueを返します。
6thr.exit
スレッドの実行を終了します。自分を返しなさい。
7thr.join
は、自身のスレッドが終了するまで現在のスレッドを一時停止します。例外により自身が終了した場合、同じ例外が現在のスレッドによってスローされます。
8thr.key?
name に対応するスレッド固有のデータが定義されている場合、 true
9thr.kill
Thread.exit と同様。
10thr.priority
優先度のデフォルト値は0です。値が大きいほど優先度が高くなります。
11thr.priority=
スレッドの優先度を負の数値に設定することもできます。
12thr.raise( anException )
このスレッドで強制的に例外を発生させます。
13 thr .run
ウェイクアップとは異なり、このメソッドが停止したプロセスで使用されると、ThreadError 例外が発生します。 safe_level
15
thr.status
16
thr.stop?
17
thr.value