ホームページ  >  記事  >  ウェブフロントエンド  >  Java マルチスレッドの同時協調的なプロデューサー/コンシューマー設計パターン

Java マルチスレッドの同時協調的なプロデューサー/コンシューマー設計パターン

伊谢尔伦
伊谢尔伦オリジナル
2016-11-21 13:31:371828ブラウズ

2 つのスレッド、1 つのプロデューサーと 1 つのコンシューマー

需要シナリオ

2 つのスレッド、1 つは生産を担当し、1 つは消費を担当し、1 つのプロデューサーが 1 つを生成し、コンシューマーが 1 つを消費します

関連する問題

同期の問題:リソースが複数のスレッドによって同時にアクセスされる場合でも、同じ整合性が維持されます。一般的に使用される同期方法は、マーキング メカニズムまたはロック メカニズムを使用することです。wait() / nofity() メソッドは、基本クラス Object の 2 つのメソッドです。つまり、すべての Java クラスがこれら 2 つのメソッドを持つことになります。同期メカニズムを実装します。

Wait() メソッド: バッファーがいっぱいまたは空の場合、プロデューサー/コンシューマー スレッドは自身の実行を停止し、ロックを放棄し、他のスレッドが実行できるように待機状態になります。

Notify() メソッド: プロデューサ/コンシューマがバッファにプロダクトを入れたり、バッファからプロダクトを取り出したりするとき、実行可能な通知を待機中の他のスレッドに送信し、同時にロックを放棄して自身を待機状態にします。

コード実装(合計3つのクラスとmainメソッドを持つテストクラス)


  Resource.java
  /**
  * Created by yuandl on 2016-10-11./**
  * 资源
  */
  public class Resource {
  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;
  /**
  * 生产资源
  */
  public synchronized void create() {
  if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
  try {
  wait();//让生产线程等待
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  number++;//生产一个
  System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
  flag = true;//将资源标记为已经生产
  notify();//唤醒在等待操作资源的线程(队列)
  }
  /**
  * 消费资源
  */
  public synchronized void destroy() {
  if (!flag) {
  try {
  wait();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  System.out.println(Thread.currentThread().getName() + "消费者****" + number);
  flag = false;
  notify();
  }
 }

Producer.java

  /**
  * Created by yuandl on 2016-10-11.
  *
  /**
  * 生产者
  */
  public class Producer implements Runnable {
  private Resource resource;
  public Producer(Resource resource) {
  this.resource = resource;
  }
  @Override
  public void run() {
  while (true) {
  try {
  Thread.sleep(10);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  resource.create();
  }
  }
  }

Consumer.java

 /**
  * 消费者
  */
  public class Consumer implements Runnable {
  private Resource resource;
  public Consumer(Resource resource) {
  this.resource = resource;
  }
  @Override
  public void run() {
  while (true) {
  try {
  Thread.sleep(10);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  resource.destroy();
  }
  }
  }

ProducerConsumerTest.java

りー

印刷結果

  /**
  * Created by yuandl on 2016-10-11.
  */
  public class ProducerConsumerTest {
  public static void main(String args[]) {
  Resource resource = new Resource();
  new Thread(new Producer(resource)).start();//生产者线程
  new Thread(new Consumer(resource)).start();//消费者线程
  }
  }

上記の印刷結果から、問題がないことがわかります

複数のスレッド、複数のプロデューサ、複数のコンシューマに問題があります


需要シナリオ

4 つのスレッド、そのうち 2 つは生産を担当します、 2 つは消費を担当し、プロデューサーは 1 つを生産し、コンシューマーは 1 つを消費します

関連する問題

NotifyAll() メソッド: プロデューサー/コンシューマーがバッファーに製品を入れる/取り出すと、待機している他のすべてのユーザーにメッセージを発行しますスレッド 実行可能通知もロックを放棄し、待機状態になります。

コードを再度テストします


Thread-0生产者------------1
 Thread-1消费者****1
  Thread-0生产者------------2
  Thread-1消费者****2
  Thread-0生产者------------3
  Thread-1消费者****3
  Thread-0生产者------------4
  Thread-1消费者****4
  Thread-0生产者------------5
  Thread-1消费者****5
  Thread-0生产者------------6
  Thread-1消费者****6
  Thread-0生产者------------7
  Thread-1消费者****7
  Thread-0生产者------------8
  Thread-1消费者****8
  Thread-0生产者------------9
  Thread-1消费者****9
  Thread-0生产者------------10
  Thread-1消费者****10

実行結果

 ProducerConsumerTest.java
  /**
  * Created by yuandl on 2016-10-11.
  */
  public class ProducerConsumerTest {
  public static void main(String args[]) {
  Resource resource = new Resource();
  new Thread(new Consumer(resource)).start();//生产者线程
  new Thread(new Consumer(resource)).start();//生产者线程
  new Thread(new Producer(resource)).start();//消费者线程
  new Thread(new Producer(resource)).start();//消费者线程
  }
  }

上記の印刷結果から問題を発見しました

101 は 1 回生成され、2 回消費されました


105 は生成されましたが、消費されませんでした

原因分析

2 つのスレッドがプロデューサーの生産またはコンシューマーの消費を同時に操作する場合、プロデューサーまたは両方のスレッドが存在する場合は、wait() を再度 Notify() します。これは、一方のスレッドがマークを変更し、もう一方のスレッドが再度マークを変更したためです。これは直接実行した場合に判定マークがないことが原因です。

判定マークを一度だけ付けると、実行すべきでないスレッドが実行されてしまいます。データエラーが発生しました。

解決策

while判定マークは、スレッドが実行権を獲得した後に実行したいかどうかの問題を解決します! つまり、wait()の後にnotify()が続くたびに、マークを再度判定する必要があります

コードの改善(if-> in Resource ;while)

Resource.java

  Thread-0生产者------------100
  Thread-3消费者****100
  Thread-0生产者------------101
  Thread-3消费者****101
  Thread-2消费者****101
  Thread-1生产者------------102
  Thread-3消费者****102
  Thread-0生产者------------103
  Thread-2消费者****103
  Thread-1生产者------------104
  Thread-3消费者****104
  Thread-1生产者------------105
  Thread-0生产者------------106
  Thread-2消费者****106
  Thread-1生产者------------107
  Thread-3消费者****107
  Thread-0生产者------------108
  Thread-2消费者****108
  Thread-0生产者------------109
  Thread-2消费者****109
  Thread-1生产者------------110
  Thread-3消费者****110

また問題が見つかりました

本番後に74などの特定の値に出力すると、プログラムがロックされているかのようにスタックして実行されます。


原因分析

通知: このパーティがこのパーティを起動するのは 1 つのスレッドだけです。また、while判定マーク+通知は「デッドロック」を引き起こします。

解決策

NotifyAllは、自分のスレッドが必ず相手のスレッドを起こしてしまうという問題を解決します。

Resourceの最終コード改善(notify()->notifyAll())

Resource.java

 
 /**
  * Created by yuandl on 2016-10-11./**
  * 资源
  */
  public class Resource {
  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;
  /**
  * 生产资源
  */
  public synchronized void create() {
  while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
  try {
  wait();//让生产线程等待
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  number++;//生产一个
  System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
  flag = true;//将资源标记为已经生产
  notify();//唤醒在等待操作资源的线程(队列)
  }
  /**
  * 消费资源
  */
  public synchronized void destroy() {
  while (!flag) {
  try {
  wait();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  System.out.println(Thread.currentThread().getName() + "消费者****" + number);
  flag = false;
  notify();
  }
  }

実行結果

 /**
  * Created by yuandl on 2016-10-11./**
  * 资源
  */
  public class Resource {
  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;
  /**
  * 生产资源
  */
  public synchronized void create() {
  while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
  try {
  wait();//让生产线程等待
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  number++;//生产一个
  System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
  flag = true;//将资源标记为已经生产
  notifyAll();//唤醒在等待操作资源的线程(队列)
  }
  /**
  * 消费资源
  */
  public synchronized void destroy() {
  while (!flag) {
  try {
  wait();
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  System.out.println(Thread.currentThread().getName() + "消费者****" + number);
  flag = false;
  notifyAll();
  }
  }

上記は完了しました、問題ありません

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。