Ruby 多線程


每個正在系統上執行的程式都是一個行程。每個行程包含一到多個執行緒。

執行緒是程式中一個單一的順序控制流程,在單一程式中同時執行多個執行緒完成不同的工作,稱為多執行緒。

Ruby 中我們可以透過 Thread 類別來建立多線程,Ruby的線程是一個輕量級的,可以以高效的方式來實現並行的程式碼。


建立Ruby 執行緒

要啟動一個新的線程,只需要呼叫Thread.new 即可:

# 线程 #1 代码部分
Thread.new {
  # 线程 #2 执行代码
}
# 线程 #1 执行代码

實例

以下實例展示如何在Ruby程式中使用多執行緒:

#!/usr/bin/ruby

def func1
   i=0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i=i+1
   end
end

def func2
   j=0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j=j+1
   end
end

puts "Started At #{Time.now}"
t1=Thread.new{func1()}
t2=Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"


#以上程式碼執行結果為:

Started At Wed May 14 08:21:54 -0700 2014
func1 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:55 -0700 2014
func1 at: Wed May 14 08:21:56 -0700 2014
func2 at: Wed May 14 08:21:56 -0700 2014
func1 at: Wed May 14 08:21:58 -0700 2014
End at Wed May 14 08:22:00 -0700 2014

執行緒生命週期

1 、執行緒的建立可以使用Thread.new,同樣可以以同樣的語法使用Thread.start 或Thread.fork這三個方法來建立執行緒。

2、建立執行緒後無需啟動,執行緒會自動執行。

3、Thread 類別定義了一些方法來操控執行緒。執行緒執行Thread.new中的程式碼區塊。

4、執行緒程式碼區塊中最後一個語句是執行緒的值,可以透過執行緒的方法來調用,如果執行緒執行完畢,則傳回執行緒值,否則不回傳值直到執行緒執行完畢。

5、Thread.current 方法傳回表示目前執行緒的物件。 Thread.main 方法傳回主執行緒。

6、透過 Thread.Join 方法來執行線程,這個方法會掛起主線程,直到目前執行緒執行完畢。


執行緒狀態

執行緒有5種狀態:

# #退出aborting正常終止false發生異常終止nil#

線程和異常

當某線程發生異常,且沒有被rescue捕捉到時,該線程通常會被無警告地終止。但是,若有其它線程因為Thread#join的關係一直等待該線程的話,則等待的線程同樣會被引發相同的異常。

begin
  t = Thread.new do
    Thread.pass    # 主线程确实在等join
    raise "unhandled exception"
  end
  t.join
rescue
  p $!  # => "unhandled exception"
end

使用下列3個方法,就可以讓解譯器在某個執行緒因異常而終止時中斷運作。

  • 啟動腳本時指定-d選項,並以偵錯模時執行。

  • Thread.abort_on_exception設定標誌。

  • 使用Thread#abort_on_exception對指定的執行緒設定標誌。

當使用上述3種方法之一後,整個解釋器就會中斷。

t = Thread.new { ... }
t.abort_on_exception = true

執行緒同步控制

在Ruby中,提供三種實作同步的方式,分別是:

1. 透過Mutex類別實作執行緒同步

2. 監管資料交接的Queue類別實作執行緒同步

3. 使用ConditionVariable實作同步控制

透過Mutex類別實作執行緒同步

##透過Mutex類別實現執行緒同步控制,如果在多個執行緒鐘同時需要一個程式變量,可以將這個變數部分使用lock鎖定。 程式碼如下:

#!/usr/bin/ruby

require "thread"
puts "Synchronize Thread"

@num=200
@mutex=Mutex.new

def buyTicket(num)
     @mutex.lock
          if @num>=num
               @num=@num-num
               puts "you have successfully bought #{num} tickets"
          else
               puts "sorry,no enough tickets"
          end
     @mutex.unlock
end

ticket1=Thread.new 10 do
     10.times do |value|
     ticketNum=15
     buyTicket(ticketNum)
     sleep 0.01
     end
end

ticket2=Thread.new 10 do
     10.times do |value|
     ticketNum=20
     buyTicket(ticketNum)
     sleep 0.01
     end
end

sleep 1
ticket1.join
ticket2.join


以上程式碼執行結果為:

Synchronize Thread
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets

除了使用lock鎖定變量,還可以使用try_lock鎖定變量,也可以使用Mutex. synchronize同步對某一個變數的存取。

監管資料交接的Queue類別實作執行緒同步

Queue類別就是表示一個支援執行緒的佇列,能夠同步對佇列末端進行存取。不同的執行緒可以使用統一個對類,但是不用擔心這個佇列中的資料是否能夠同步,另外使用SizedQueue類別能夠限制佇列的長度

SizedQueue類別能夠非常便捷的幫助我們開發執行緒同步的應用程序,應為只要加入到這個佇列中,就不用關心執行緒的同步問題。

經典的生產者消費者問題:

#!/usr/bin/ruby

require "thread"
puts "SizedQuee Test"

queue = Queue.new

producer = Thread.new do
     10.times do |i|
          sleep rand(i) # 让线程睡眠一段时间
          queue << i
          puts "#{i} produced"
     end
end

consumer = Thread.new do
     10.times do |i|
          value = queue.pop
          sleep rand(i/2)
          puts "consumed #{value}"
     end
end

consumer.join


程式的輸出:

SizedQuee Test
0 produced
1 produced
consumed 0
2 produced
consumed 1
consumed 2
3 produced
consumed 34 produced

consumed 4
5 produced
consumed 5
6 produced
consumed 6
7 produced
consumed 7
8 produced
9 produced
consumed 8
consumed 9


執行緒變數

線程可以有其私有變量,線程的私有變數會在線程創建的時候寫入線程。可以被線程範圍內使用,但是不能被線程外部共享。

但是有時候,執行緒的局部變數需要別別的執行緒或主執行緒存取怎麼辦? ruby當中提供了允許透過名字來創建線程變量,類似的把線程看做hash式的散列表。透過[]=寫入並透過[]讀出資料。讓我們來看看下面的程式碼:

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

以上程式碼執行輸出結果為:

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

主執行緒等待子執行緒執行完成,然後分別輸出每個值。 。


執行緒優先權

執行緒的優先權是影響執行緒的調度的主要因素。其他因素包括佔用CPU的執行時間長短,執行緒分組調度等等。

可以使用 Thread.priority 方法得到執行緒的優先權和使用 Thread.priority= 方法來調整執行緒的優先權。

執行緒的優先權預設為 0 。 優先順序較高的執行的要快。

一個 Thread 可以存取自己作用域內的所有數據,但如果有需要在某個執行緒內存取其他執行緒的資料應該怎麼做呢? Thread 類別提供了執行緒資料互相存取的方法,你可以簡單的把一個執行緒當作一個 Hash 表,可以在任何執行緒內使用 []= 寫入數據,使用 [] 讀出資料。

athr = Thread.new { Thread.current["name"] = "Thread A"; Thread.stop }
bthr = Thread.new { Thread.current["name"] = "Thread B"; Thread.stop }
cthr = Thread.new { Thread.current["name"] = "Thread C"; Thread.stop }
Thread.list.each {|x| puts "#{x.inspect}: #{x["name"]}" }

可以看到,把執行緒當作一個 Hash 表,使用 [] 和 []= 方法,我們實作了執行緒之間的資料共享。


執行緒互斥

Mutex(Mutal Exclusion = 互斥鎖)是一種用於多執行緒程式設計中,防止兩個執行緒同時對同一公共資源(例如全域變量)進行讀寫的機制。

不使用Mutax的實例

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

以上實例執行輸出結果為:

count1 :  9712487
count2 :  12501239
difference : 0

使用Mutax的實例

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
    end
end
spy = Thread.new do
   loop do
       mutex.synchronize do
          difference += (count1 - count2).abs
       end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

以上實例執行輸出結果為:

count1 :  1336406
count2 :  1336406
difference : 0

死鎖

兩個以上的運算單元,雙方都在等待對方停止運行,以獲取系統資源,但是沒有一方提前退出時,這種狀況,就稱為死鎖。

例如,一個流程 p1佔用了顯示器,同時又必須使用印表機,而印表機被進程p2佔用,p2又必須使用顯示器,這樣就形成了死鎖。

當我們在使用 Mutex 物件時需要注意線程死鎖。

實例

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

以上實例輸出結果為:

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

執行緒類別方法

完整的 Thread(執行緒) 類別方法如下:

#執行緒狀態傳回值
可執行run
#睡眠Sleeping
序號方法描述
#1Thread.abort_on_exception
#若其值為真的話,一旦某線程因異常而終止時,整個解釋器就會被中斷。它的預設值是假,也就是說,在通常情況下,若某執行緒發生異常且該異常未被Thread#join等偵測到時,該執行緒會被無警告地終止。
2Thread.abort_on_exception=
如果設定為true, 一旦某執行緒因例外狀況而終止時,整個解釋器就會中斷。傳回新的狀態
3Thread.critical
傳回布林值。
4Thread.critical=
當其值為true時,將不會進行執行緒切換。若當前執行緒掛起(stop)或有訊號(signal)介入時,其值將自動變成false。
5Thread.current
#傳回目前執行中的執行緒(目前執行緒)。
6Thread.exit
#終止目前執行緒的執行。返回當前線程。若當前執行緒是唯一的一個執行緒時,將使用exit(0)來終止它的運行。
7Thread.fork { block }
與 Thread.new 一樣產生執行緒。
8Thread.kill( aThread )
終止執行緒的執行.
9Thread.list
傳回處於運作狀態或掛起狀態的活執行緒的陣列。
10Thread.main
#回傳主執行緒。
11Thread.new( [ arg ]* ) {| args | block }
產生執行緒,並開始執行。數值會原封不動地傳遞給區塊. 這就可以在啟動執行緒的同時,將值傳遞給該執行緒所固有的局部變數。
12Thread.pass
#將運行權交給其他執行緒. 它不會改變運行中的執行緒的狀態,而是將控制權交給其他可運行的執行緒(顯式的執行緒調度)。
13Thread.start( [ args ]* ) {| args | block }
產生執行緒,並開始執行。數值會原封不動地傳遞給區塊. 這就可以在啟動執行緒的同時,將值傳遞給該執行緒所固有的局部變數。
14Thread.stop
將目前執行緒掛起,直到其他執行緒使用run方法再次喚醒該執行緒。

執行緒實例化方法

以下實例呼叫了執行緒實例化方法 join:

#!/usr/bin/ruby

thr = Thread.new do   # 实例化
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # 调用实例化方法 join

以下是完整實例化方法清單:

序號方法描述
#1thr[ name ]
取出執行緒內與name相對應的固有資料。 name可以是字串或符號。 若沒有與name相對應的資料時, 返回nil。
2thr[ name ] =
設定執行緒內name相對應的固有資料的值, name可以是字串或符號。 若設為nil時, 將刪除該執行緒內對應資料。
3thr.abort_on_exception
傳回布林值。
4thr.abort_on_exception=
#若其值為true的話,一旦某線程因異常而終止時,整個解釋器就會被中斷。
5thr.alive?
若執行緒是"活"的,就回傳true。
6thr.exit
#終止執行緒的執行。返回self。
7thr.join
#掛起目前執行緒,直到self執行緒終止運行為止. 若self因例外狀況而終止時, 將會當前執行緒引發同樣的異常。
8thr.key?
若與name相對應的執行緒固有資料已經被定義的話,就回傳true
9thr.kill
#類似於Thread.exit
10thr.priority
傳回執行緒的優先權. 優先權的預設值為0. 該值越大則優先權越高.
11thr.priority=
#設定執行緒的優先權. 也可以設定為負數.
12thr.raise( anException )
在該執行緒內強行引發異常.
13thr.run
重新啟動被掛起(stop)的線程. 與wakeup不同的是,它將立即進行線程的切換. 若對死程序使用此方法時, 將引發ThreadError異常.
#14thr.safe_level
#返回self 的安全等級. 目前執行緒的safe_level與$SAFE相同.
15thr.status
使用字串"run"、" sleep"或"aborting" 來表示活線程的狀態. 若某線程是正常終止的話,就返回false. 若因異常而終止的話,就返回nil。
16thr.stop?
若執行緒處於終止狀態(dead)或被掛起(stop)時,返回true.
17thr.value
一直等到self執行緒終止運行(等同於join)後,傳回該執行緒的區塊的回傳值. 若在執行緒的執行過程中發生了異常, 就會再次引發該異常.
#18##thr.wakeup把被掛起(stop)的執行緒的狀態改為可執行狀態(run), 若對死執行緒執行該方法時,將會引發ThreadError例外。
#