Home >Web Front-end >JS Tutorial >Java multi-threaded concurrent collaborative producer-consumer design pattern
Two threads, one producer and one consumer
Demand scenario
Two threads, one responsible for production and one responsible for consumption, one producer produces and one consumer consumes
Issues involved
Synchronization issues: how to ensure the same Integrity when resources are accessed concurrently by multiple threads. Commonly used synchronization methods are to use marking or locking mechanisms. The wait() / nofity() methods are two methods of the base class Object, which means that all Java classes will have these two methods. In this way, we can Any object implements a synchronization mechanism.
Wait() method: When the buffer is full/empty, the producer/consumer thread stops its own execution, gives up the lock, puts itself in a wait state, and allows other threads to execute.
Notify() method: When the producer/consumer puts/takes out a product from the buffer, it sends an executable notification to other waiting threads, and at the same time gives up the lock and puts itself in a waiting state.
Code implementation (a total of three classes and a test class with main method)
Resource.java /** * Created by yuandl on 2016-10-11./** * 资源 */ public class Resource { /*资源序号*/ private int number = 0; /*资源标记*/ private boolean flag = false; /** * 生产资源 */ public synchronized void create() { if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费; try { wait();//让生产线程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生产一个 System.out.println(Thread.currentThread().getName() + "生产者------------" + number); flag = true;//将资源标记为已经生产 notify();//唤醒在等待操作资源的线程(队列) } /** * 消费资源 */ public synchronized void destroy() { if (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消费者****" + number); flag = false; notify(); } }
/**
* Created by yuandl on 2016-10-11.
*
/**
* 生产者
*/
public class Producer implements Runnable {
private Resource resource;
public Producer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.create();
}
}
}
/**
* 消费者
*/
public class Consumer implements Runnable {
private Resource resource;
public Consumer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.destroy();
}
}
}
/**
* Created by yuandl on 2016-10-11.
*/
public class ProducerConsumerTest {
public static void main(String args[]) {
Resource resource = new Resource();
new Thread(new Producer(resource)).start();//生产者线程
new Thread(new Consumer(resource)).start();//消费者线程
}
}
Thread-0生产者------------1
Thread-1消费者****1
Thread-0生产者------------2
Thread-1消费者****2
Thread-0生产者------------3
Thread-1消费者****3
Thread-0生产者------------4
Thread-1消费者****4
Thread-0生产者------------5
Thread-1消费者****5
Thread-0生产者------------6
Thread-1消费者****6
Thread-0生产者------------7
Thread-1消费者****7
Thread-0生产者------------8
Thread-1消费者****8
Thread-0生产者------------9
Thread-1消费者****9
Thread-0生产者------------10
Thread-1消费者****10
Issues with multiple threads, multiple producers and multiple consumers
Demand scenario
Four threads, two of them are responsible for production, Two are responsible for consumption, the producer produces one and the consumer consumes one
Issues involved
NotifyAll() method: When the producer/consumer puts/takes out a product from the buffer, it issues a message to all other waiting threads The executable notification also gives up the lock and puts itself in a waiting state.
Test the code again
ProducerConsumerTest.java /** * Created by yuandl on 2016-10-11. */ public class ProducerConsumerTest { public static void main(String args[]) { Resource resource = new Resource(); new Thread(new Consumer(resource)).start();//生产者线程 new Thread(new Consumer(resource)).start();//生产者线程 new Thread(new Producer(resource)).start();//消费者线程 new Thread(new Producer(resource)).start();//消费者线程 } }
Thread-0生产者------------100
Thread-3消费者****100
Thread-0生产者------------101
Thread-3消费者****101
Thread-2消费者****101
Thread-1生产者------------102
Thread-3消费者****102
Thread-0生产者------------103
Thread-2消费者****103
Thread-1生产者------------104
Thread-3消费者****104
Thread-1生产者------------105
Thread-0生产者------------106
Thread-2消费者****106
Thread-1生产者------------107
Thread-3消费者****107
Thread-0生产者------------108
Thread-2消费者****108
Thread-0生产者------------109
Thread-2消费者****109
Thread-1生产者------------110
Thread-3消费者****110
101 was produced once and consumed twice
105 was produced but not consumed
Cause analysis
When two threads operate producer production or consumer consumption at the same time, if there is a producer or both threads wait(), notify() again, because one thread has changed the mark and the other thread has changed the mark again. This is caused by no judgment mark when executing directly.
If judgment mark, only once, will cause threads that should not run to run. A data error has occurred.
Solution
The while judgment mark solves the problem of whether the thread wants to run after it obtains execution rights! That is, every time wait() is followed by notify(), the mark must be judged again
Code improvement (if-> in Resource ;while)
Resource.java
/** * Created by yuandl on 2016-10-11./** * 资源 */ public class Resource { /*资源序号*/ private int number = 0; /*资源标记*/ private boolean flag = false; /** * 生产资源 */ public synchronized void create() { while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费; try { wait();//让生产线程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生产一个 System.out.println(Thread.currentThread().getName() + "生产者------------" + number); flag = true;//将资源标记为已经生产 notify();//唤醒在等待操作资源的线程(队列) } /** * 消费资源 */ public synchronized void destroy() { while (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消费者****" + number); flag = false; notify(); } }
When printing to a certain value, such as 74 after production, the program runs stuck, as if it is locked.
Cause analysis
Notify: can only wake up one thread. If this party wakes up this party, it is meaningless. Moreover, the while judgment mark + notify will cause "deadlock".
Solution
NotifyAll solves the problem that the own thread will definitely wake up the other party's thread.
Final code improvement (notify()->notifyAll() in Resource)
Resource.java
/** * Created by yuandl on 2016-10-11./** * 资源 */ public class Resource { /*资源序号*/ private int number = 0; /*资源标记*/ private boolean flag = false; /** * 生产资源 */ public synchronized void create() { while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费; try { wait();//让生产线程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生产一个 System.out.println(Thread.currentThread().getName() + "生产者------------" + number); flag = true;//将资源标记为已经生产 notifyAll();//唤醒在等待操作资源的线程(队列) } /** * 消费资源 */ public synchronized void destroy() { while (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消费者****" + number); flag = false; notifyAll(); } }
Thread-0生产者------------412
Thread-2消费者****412
Thread-0生产者------------413
Thread-3消费者****413
Thread-1生产者------------414
Thread-2消费者****414
Thread-1生产者------------415
Thread-2消费者****415
Thread-0生产者------------416
Thread-3消费者****416
Thread-1生产者------------417
Thread-3消费者****417
Thread-0生产者------------418
Thread-2消费者****418
Thread-0生产者------------419
Thread-3消费者****419
Thread-1生产者------------420
Thread-2消费者****420