Ruby multithreading


Every program running on the system is a process. Each process contains one or more threads.

A thread is a single sequential control process in a program. Running multiple threads at the same time to complete different tasks in a single program is called multi-threading.

In Ruby we can create multi-threads through the Thread class. Ruby's thread is lightweight and can implement parallel code in an efficient way.


Create Ruby thread

To start a new thread, just call Thread.new:

# 线程 #1 代码部分
Thread.new {
  # 线程 #2 执行代码
}
# 线程 #1 执行代码

Example

The following example Shows how to use multi-threading in Ruby programs:

#!/usr/bin/ruby

def func1
   i=0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i=i+1
   end
end

def func2
   j=0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j=j+1
   end
end

puts "Started At #{Time.now}"
t1=Thread.new{func1()}
t2=Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"


The execution result of the above code is:

Started At Wed May 14 08:21:54 -0700 2014
func1 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:55 -0700 2014
func1 at: Wed May 14 08:21:56 -0700 2014
func2 at: Wed May 14 08:21:56 -0700 2014
func1 at: Wed May 14 08:21:58 -0700 2014
End at Wed May 14 08:22:00 -0700 2014

Thread life cycle

1 , Threads can be created using Thread.new, and threads can also be created using the three methods Thread.start or Thread.fork with the same syntax.

2. There is no need to start the thread after creating it, the thread will execute automatically.

3. The Thread class defines some methods to control threads. The thread executes the code block in Thread.new.

4. The last statement in the thread code block is the thread value, which can be called through the thread method. If the thread execution is completed, the thread value will be returned. Otherwise, the value will not be returned until the thread execution is completed.

5. The Thread.current method returns an object representing the current thread. Thread.main method returns the main thread.

6. Execute the thread through the Thread.Join method. This method will suspend the main thread until the current thread completes execution.


Thread status

Threads have 5 states:

Thread statusReturn value
Executablerun
SleepSleeping
Exitaborting
Normal terminationfalse
Abnormal termination occurrednil

Threads and exceptions

When an exception occurs in a thread and is not caught by rescue, the thread is usually terminated without warning. However, if other threads have been waiting for this thread due to Thread#join, the waiting thread will also be thrown the same exception.

begin
  t = Thread.new do
    Thread.pass    # 主线程确实在等join
    raise "unhandled exception"
  end
  t.join
rescue
  p $!  # => "unhandled exception"
end

Use the following 3 methods to allow the interpreter to interrupt when a thread terminates due to an exception.

  • Specify the -d option when starting the script and run it in debug mode.

  • Set the flag with Thread.abort_on_exception.

  • Use Thread#abort_on_exception to set the flag on the specified thread.

When one of the above three methods is used, the entire interpreter will be interrupted.

t = Thread.new { ... }
t.abort_on_exception = true

Thread synchronization control

In Ruby, three ways to achieve synchronization are provided, namely:

1. Thread synchronization through the Mutex class

2. The Queue class that supervises data handover implements thread synchronization

3. Uses ConditionVariable to implement synchronization control

Thread synchronization is implemented through the Mutex class

It is implemented through the Mutex class Thread synchronization control, if a program variable is needed in multiple threads at the same time, this variable can be partially locked using a lock. The code is as follows:

#!/usr/bin/ruby

require "thread"
puts "Synchronize Thread"

@num=200
@mutex=Mutex.new

def buyTicket(num)
     @mutex.lock
          if @num>=num
               @num=@num-num
               puts "you have successfully bought #{num} tickets"
          else
               puts "sorry,no enough tickets"
          end
     @mutex.unlock
end

ticket1=Thread.new 10 do
     10.times do |value|
     ticketNum=15
     buyTicket(ticketNum)
     sleep 0.01
     end
end

ticket2=Thread.new 10 do
     10.times do |value|
     ticketNum=20
     buyTicket(ticketNum)
     sleep 0.01
     end
end

sleep 1
ticket1.join
ticket2.join


The execution result of the above code is:

Synchronize Thread
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets
sorry,no enough tickets

In addition to using lock to lock variables, you can also use try_lock to lock variables, and you can also use Mutex. synchronize synchronizes access to a variable.

The Queue class that supervises data transfer implements thread synchronization

The Queue class represents a queue that supports threads and can access the end of the queue synchronously. Different threads can use the same pair class, but you don’t have to worry about whether the data in this queue can be synchronized. In addition, using the SizedQueue class can limit the length of the queue

The SizedQueue class can very conveniently help us develop thread synchronization applications. The program should not need to worry about thread synchronization as long as it joins this queue.

Classic producer-consumer problem:

#!/usr/bin/ruby

require "thread"
puts "SizedQuee Test"

queue = Queue.new

producer = Thread.new do
     10.times do |i|
          sleep rand(i) # 让线程睡眠一段时间
          queue << i
          puts "#{i} produced"
     end
end

consumer = Thread.new do
     10.times do |i|
          value = queue.pop
          sleep rand(i/2)
          puts "consumed #{value}"
     end
end

consumer.join


Output of the program:

SizedQuee Test
0 produced
1 produced
consumed 0
2 produced
consumed 1
consumed 2
3 produced
consumed 34 produced

consumed 4
5 produced
consumed 5
6 produced
consumed 6
7 produced
consumed 7
8 produced
9 produced
consumed 8
consumed 9

Thread variables

Threads can have their own private variables. The thread's private variables are written to the thread when the thread is created. Can be used within the thread scope, but cannot be shared outside the thread.

But sometimes, what should I do if the local variables of a thread need to be accessed by other threads or the main thread? Ruby provides a way to create thread variables by name, similar to treating threads as hash tables. Write data via []= and read data via []. Let's take a look at the following code:

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

The output result of the above code is:

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

The main thread waits for the child thread to complete execution, and then outputs each value separately. .


Thread priority

The priority of a thread is the main factor that affects thread scheduling. Other factors include the length of execution time occupied by the CPU, thread grouping scheduling, etc.

You can use the Thread.priority method to get the thread's priority and the Thread.priority= method to adjust the thread's priority.

The priority of the thread defaults to 0. Those with higher priority execute faster.

A Thread can access all data within its own scope, but what should you do if you need to access data from other threads in a thread? The Thread class provides a method for thread data to access each other. You can simply use a thread as a Hash table. You can use []= to write data and [] to read data in any thread.

athr = Thread.new { Thread.current["name"] = "Thread A"; Thread.stop }
bthr = Thread.new { Thread.current["name"] = "Thread B"; Thread.stop }
cthr = Thread.new { Thread.current["name"] = "Thread C"; Thread.stop }
Thread.list.each {|x| puts "#{x.inspect}: #{x["name"]}" }

As you can see, by treating the thread as a Hash table and using the [] and []= methods, we realize data sharing between threads.


Thread mutual exclusion

Mutex (Mutal Exclusion = mutex lock) is a type of lock used in multi-thread programming to prevent two threads from accessing the same public resource (such as global variables) at the same time. ) mechanism for reading and writing.

Instances without Mutax

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

The output results of the above examples are:

count1 :  9712487
count2 :  12501239
difference : 0

Instances using Mutax

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
    end
end
spy = Thread.new do
   loop do
       mutex.synchronize do
          difference += (count1 - count2).abs
       end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

The output results of the above examples are:

count1 :  1336406
count2 :  1336406
difference : 0

Deadlock

For two or more computing units, both parties are waiting for the other to stop running to obtain system resources, but when neither party exits early, this situation is It's called a deadlock.

For example, a process p1 occupies the monitor and must use the printer at the same time, and the printer is occupied by process p2, and p2 must use the monitor, thus forming a deadlock.

We need to pay attention to thread deadlock when using Mutex objects.

Example

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

The output result of the above example is:

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

Thread class method

The complete Thread (thread) class method is as follows:

Serial numberMethod description
1Thread.abort_on_exception
If its value is true, once a thread terminates due to an exception, the entire interpreter will be interrupted. Its default value is false, which means that under normal circumstances, if an exception occurs in a thread and the exception is not detected by Thread#join, etc., the thread will be terminated without warning.
2Thread.abort_on_exception=
If set to true, once a thread terminates due to exception , the entire interpreter will be interrupted. Returns the new status
3Thread.critical
Returns a Boolean value.
4Thread.critical=
When its value is true, thread switching will not occur. If the current thread hangs (stop) or a signal intervenes, its value will automatically change to false.
5Thread.current
Returns the currently running thread (current thread).
6Thread.exit
Terminate the running of the current thread. Return the current thread. If the current thread is the only thread, exit(0) will be used to terminate its operation.
7Thread.fork { block }
Generates threads the same as Thread.new.
8Thread.kill( aThread )
Terminate the running of the thread.
9Thread.list
Returns an array of live threads in the running or suspended state.
10Thread.main
Return to the main thread.
11Thread.new([ arg ]* ) {| args | block }
Generate a thread and start execution. The number will be passed unchanged to the block. This allows the value to be passed to the local variable inherent to the thread when starting the thread.
12Thread.pass
Gives running rights to other threads. It will not change the state of the running thread, Instead, control is given to other runnable threads (explicit thread scheduling).
13Thread.start( [ args ]* ) {| args | block }
Generate a thread and start execution. The number will be passed unchanged to the block. This allows the value to be passed to the local variable inherent to the thread when starting the thread.
14Thread.stop
Suspend the current thread until other threads use the run method to wake up the thread again.

Thread instantiation method

The following example calls the thread instantiation method join:

#!/usr/bin/ruby

thr = Thread.new do   # 实例化
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # 调用实例化方法 join

The following is the complete list of instantiation methods:

Serial numberMethod description
1thr[ name ]
Get the inherent data corresponding to name in the thread. name can be a string or a symbol. If there is no data corresponding to name, nil is returned.
2thr[ name ] =
Set the value of the inherent data corresponding to the name in the thread, name can be a string or symbol. If set to nil, the corresponding data in the thread will be deleted.
3thr.abort_on_exception
Returns a Boolean value.
4thr.abort_on_exception=
If its value is true, once a thread terminates due to an exception, the entire explanation The device will be interrupted.
5thr.alive?
If the thread is "alive", return true.
6thr.exit
Terminate the running of the thread. Return self.
7thr.join
Hang the current thread until the self thread terminates. If self terminates due to an exception , the same exception will be thrown by the current thread.
8thr.key?
If the thread-specific data corresponding to name has been defined, return true
9thr.kill
is similar to Thread.exit.
10thr.priority
Returns the priority of the thread. The default value of priority is 0. The larger the value, the The higher the priority.
11thr.priority=
Set the priority of the thread. You can also set it Is a negative number.
12thr.raise( anException )
Forcibly raise an exception in this thread.
13thr.run
Restart the suspended (stop) thread. Unlike wakeup, it will switch threads immediately . If this method is used on a dead process, a ThreadError exception will be raised.
14thr.safe_level
Return the safety of self Level. The safe_level of the current thread is the same as $SAFE.
15thr.status
Use the string "run", " sleep" or "aborting" to indicate the status of a live thread. If a thread terminates normally, it returns false. If it terminates due to an exception, it returns nil.
16thr.stop?
If the thread is in the terminated state (dead) or suspended (stop), return true.
17thr.value
Wait until the self thread terminates (equivalent to join), then return the thread's The return value of the block. If an exception occurs during the running of the thread, the exception will be raised again.
18thr.wakeup
Change the status of the suspended thread (stop) to the executable state (run). If this method is executed on a dead thread, a ThreadError exception will be raised.