Heim  >  Artikel  >  Java  >  Beispiele für Java-Parallelität CountDownLatch, CyclicBarrier und Semaphore

Beispiele für Java-Parallelität CountDownLatch, CyclicBarrier und Semaphore

黄舟
黄舟Original
2017-09-20 10:30:271212Durchsuche

In diesem Artikel wird hauptsächlich die gleichzeitige Java-Programmierung vorgestellt: CountDownLatch-, CyclicBarrier- und Semaphore-Beispiele >

In Java 1.5 werden einige sehr nützliche Hilfsklassen bereitgestellt, die uns bei der gleichzeitigen Programmierung helfen, wie z. B. CountDownLatch, CyclicBarrier und Semaphore. Heute lernen wir die Verwendung dieser drei Hilfsklassen kennen. Das Folgende ist die Gliederung des Inhaltsverzeichnisses für diesen Artikel:

1. CountDownLatch-Nutzung

2. CyclicBarrier-Nutzung

3. Semaphor-Nutzung



Bitte verzeihen Sie mir, wenn es Ungenauigkeiten gibt, und freuen Sie sich über Kritik und Korrekturen.

1. CountDownLatch-Verwendung

Die CountDownLatch-Klasse befindet sich unter dem Paket java.util.concurrent, mit dem zählerähnliche Funktionen implementiert werden können. Es gibt beispielsweise eine Aufgabe A, die auf den Abschluss der anderen vier Aufgaben warten muss, bevor sie ausgeführt werden kann. Zu diesem Zeitpunkt können Sie CountDownLatch verwenden, um diese Funktion zu implementieren.
Die CountDownLatch-Klasse stellt nur einen Konstruktor bereit:

Dann sind die folgenden drei Methoden die wichtigsten Methoden in der CountDownLatch-Klasse:

public CountDownLatch(int count) { }; //参数count为计数值

Sehen Sie sich unten ein Beispiel an, um die Verwendung von CountDownLatch zu verstehen:

public void await() throws InterruptedException { };  //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { }; //将count值减1

Ausführungsergebnis:

public class Test {
   public static void main(String[] args) {  
     final CountDownLatch latch = new CountDownLatch(2);

     new Thread(){
       public void run() {
         try {
           System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
          Thread.sleep(3000);
          System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
          latch.countDown();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
       };
     }.start();

     new Thread(){
       public void run() {
         try {
           System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
           Thread.sleep(3000);
           System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
           latch.countDown();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
       };
     }.start();

     try {
       System.out.println("等待2个子线程执行完毕...");
      latch.await();
      System.out.println("2个子线程已经执行完毕");
      System.out.println("继续执行主线程");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
   }
}


2. Verwendung von CyclicBarrier

线程Thread-0正在执行
线程Thread-1正在执行
等待2个子线程执行完毕...
线程Thread-0执行完毕
线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程

Bedeutet im wahrsten Sinne des Wortes eine Schleifenbarriere, durch die eine Gruppe von Threads warten kann In einem bestimmten Zustand werden sie alle gleichzeitig ausgeführt. Es wird Loopback genannt, da CyclicBarrier wiederverwendet werden kann, nachdem alle wartenden Threads freigegeben wurden. Nennen wir diesen Zustand vorerst Barriere. Wenn die Methode „await()“ aufgerufen wird, befindet sich der Thread in der Barriere.
Die CyclicBarrier-Klasse befindet sich unter dem Paket java.util.concurrent und stellt zwei Konstruktoren bereit:

Der Parameter Parteien bezieht sich auf die Anzahl Threads oder Die Aufgabe wartet auf den Barrierestatus. Der Parameter barrierAction ist das, was ausgeführt wird, wenn diese Threads den Barrierestatus erreichen.

Dann ist die wichtigste Methode in CyclicBarrier die Wait-Methode, die zwei überladene Versionen hat:
public CyclicBarrier(int parties, Runnable barrierAction) {
}

public CyclicBarrier(int parties) {
}

Die erste Version wird häufiger verwendet. Wird verwendet, um den aktuellen Thread anzuhalten, bis alle Threads den Barrierestatus erreichen, und um dann nachfolgende Aufgaben gleichzeitig auszuführen.

Die zweite Version besteht darin, diese Threads eine bestimmte Zeit lang warten zu lassen die den Barrierezustand noch nicht erreicht haben, direkt Lassen Sie den Thread, der die Barriere erreicht, nachfolgende Aufgaben ausführen.
public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

Ein paar Beispiele helfen Ihnen zu verstehen:

Angenommen, es gibt mehrere Threads, die Daten schreiben müssen, und erst nachdem alle Threads den Datenschreibvorgang abgeschlossen haben, können diese Threads damit fortfahren Folgendes: Zu diesem Zeitpunkt können Sie CyclicBarrier verwenden:

Ausführungsergebnis:

public class Test {
  public static void main(String[] args) {
    int N = 4;
    CyclicBarrier barrier = new CyclicBarrier(N);
    for(int i=0;i<N;i++)
      new Writer(barrier).start();
  }
  static class Writer extends Thread{
    private CyclicBarrier cyclicBarrier;
    public Writer(CyclicBarrier cyclicBarrier) {
      this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
      System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
      try {
        Thread.sleep(5000);   //以睡眠来模拟写入数据操作
        System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }catch(BrokenBarrierException e){
        e.printStackTrace();
      }
      System.out.println("所有线程写入完毕,继续处理其他任务...");
    }
  }
}

Es Aus der obigen Ausgabe ist ersichtlich, dass jeder Schreibthread nach Abschluss des Datenschreibvorgangs darauf wartet, dass andere Threads den Schreibvorgang abschließen.

Nachdem alle Thread-Schreibvorgänge abgeschlossen sind, führen alle Threads weiterhin nachfolgende Vorgänge aus.
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

Wenn Sie zusätzliche Vorgänge ausführen möchten, nachdem alle Threads die Schreibvorgänge abgeschlossen haben, können Sie Runnable-Parameter für CyclicBarrier bereitstellen:

Ausführungsergebnisse:

public class Test {
  public static void main(String[] args) {
    int N = 4;
    CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {
      @Override
      public void run() {
        System.out.println("当前线程"+Thread.currentThread().getName());  
      }
    });

    for(int i=0;i<N;i++)
      new Writer(barrier).start();
  }
  static class Writer extends Thread{
    private CyclicBarrier cyclicBarrier;
    public Writer(CyclicBarrier cyclicBarrier) {
      this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
      System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
      try {
        Thread.sleep(5000);   //以睡眠来模拟写入数据操作
        System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }catch(BrokenBarrierException e){
        e.printStackTrace();
      }
      System.out.println("所有线程写入完毕,继续处理其他任务...");
    }
  }
}

Wie aus den Ergebnissen ersichtlich ist, wird, wenn alle vier Threads den Barrierezustand erreichen, ein Thread aus den vier Threads ausgewählt, um Runnable auszuführen.

Sehen wir uns die Auswirkung der Angabe der Wartezeit an:
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2正在写入数据...
线程Thread-3正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
当前线程Thread-3
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

Ausführungsergebnisse:

public class Test {
  public static void main(String[] args) {
    int N = 4;
    CyclicBarrier barrier = new CyclicBarrier(N);

    for(int i=0;i<N;i++) {
      if(i<N-1)
        new Writer(barrier).start();
      else {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        new Writer(barrier).start();
      }
    }
  }
  static class Writer extends Thread{
    private CyclicBarrier cyclicBarrier;
    public Writer(CyclicBarrier cyclicBarrier) {
      this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
      System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
      try {
        Thread.sleep(5000);   //以睡眠来模拟写入数据操作
        System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
        try {
          cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }catch(BrokenBarrierException e){
        e.printStackTrace();
      }
      System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
    }
  }
}

Der obige Code verzögert absichtlich den Start des letzten Threads in der for-Schleife der Hauptmethode, denn nachdem die ersten drei Threads die Barriere erreicht haben, warteten sie auf die angegebene Zeit und stellten fest, dass der vierte Der Thread hat die Barriere noch nicht erreicht. Lösen Sie einfach eine Ausnahme aus und fahren Sie mit der Ausführung nachfolgender Aufgaben fort.

Darüber hinaus kann CyclicBarrier wiederverwendet werden, siehe folgendes Beispiel:
线程Thread-0正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3正在写入数据...
java.util.concurrent.TimeoutException
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-0所有线程写入完毕,继续处理其他任务...
  at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
  at java.util.concurrent.CyclicBarrier.await(Unknown Source)
  at com.cxh.test1.Test$Writer.run(Test.java:58)
java.util.concurrent.BrokenBarrierException
  at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
  at java.util.concurrent.CyclicBarrier.await(Unknown Source)
  at com.cxh.test1.Test$Writer.run(Test.java:58)
java.util.concurrent.BrokenBarrierException
  at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
  at java.util.concurrent.CyclicBarrier.await(Unknown Source)
  at com.cxh.test1.Test$Writer.run(Test.java:58)
Thread-2所有线程写入完毕,继续处理其他任务...
java.util.concurrent.BrokenBarrierException
线程Thread-3写入数据完毕,等待其他线程写入完毕
  at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
  at java.util.concurrent.CyclicBarrier.await(Unknown Source)
  at com.cxh.test1.Test$Writer.run(Test.java:58)
Thread-3所有线程写入完毕,继续处理其他任务...

Ausführungsergebnis:

/**
 * Java学习交流QQ群:589809992 我们一起学Java!
 */
public class Test {
  public static void main(String[] args) {
    int N = 4;
    CyclicBarrier barrier = new CyclicBarrier(N);

    for(int i=0;i<N;i++) {
      new Writer(barrier).start();
    }

    try {
      Thread.sleep(25000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    System.out.println("CyclicBarrier重用");

    for(int i=0;i<N;i++) {
      new Writer(barrier).start();
    }
  }
  static class Writer extends Thread{
    private CyclicBarrier cyclicBarrier;
    public Writer(CyclicBarrier cyclicBarrier) {
      this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
      System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
      try {
        Thread.sleep(5000);   //以睡眠来模拟写入数据操作
        System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");

        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }catch(BrokenBarrierException e){
        e.printStackTrace();
      }
      System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
    }
  }
}

Aus den Ausführungsergebnissen geht hervor, dass die ersten vier Threads, nachdem sie den Barrierezustand überschritten haben, für eine neue Nutzungsrunde verwendet werden können. CountDownLatch kann nicht wiederverwendet werden.

线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
Thread-0所有线程写入完毕,继续处理其他任务...
Thread-3所有线程写入完毕,继续处理其他任务...
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-2所有线程写入完毕,继续处理其他任务...
CyclicBarrier重用
线程Thread-4正在写入数据...
线程Thread-5正在写入数据...
线程Thread-6正在写入数据...
线程Thread-7正在写入数据...
线程Thread-7写入数据完毕,等待其他线程写入完毕
线程Thread-5写入数据完毕,等待其他线程写入完毕
线程Thread-6写入数据完毕,等待其他线程写入完毕
线程Thread-4写入数据完毕,等待其他线程写入完毕
Thread-4所有线程写入完毕,继续处理其他任务...
Thread-5所有线程写入完毕,继续处理其他任务...
Thread-6所有线程写入完毕,继续处理其他任务...
Thread-7所有线程写入完毕,继续处理其他任务...
3. Semaphor-Verwendung

Semaphor wird wörtlich übersetzt als Semaphor kann die Anzahl der Threads steuern, auf die gleichzeitig zugegriffen wird, und einen durch Erwerb erhalten ()-Berechtigung, warten Sie, wenn sie nicht verfügbar ist, und release() gibt eine Berechtigung frei.
Die Semaphore-Klasse befindet sich unter dem Paket java.util.concurrent. Sie stellt zwei Konstruktoren bereit:

Lassen Sie uns über den Vergleich im sprechen Semaphore-Klasse Mehrere wichtige Methoden sind zunächst die Methoden „acquire()“ und „release()“:

public Semaphore(int permits) {     //参数permits表示许可数目,即同时可以允许多少线程进行访问
  sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {  //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
  sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

acquire() wird verwendet, um eine Lizenz zu erhalten Es gibt keine Lizenz, es kann gewartet werden, bis die Erlaubnis eingeholt wird.

release() wird verwendet, um die Berechtigung freizugeben. Beachten Sie, dass vor der Veröffentlichung eine Genehmigung eingeholt werden muss.
public void acquire() throws InterruptedException { }   //获取一个许可
public void acquire(int permits) throws InterruptedException { }  //获取permits个许可
public void release() { }     //释放一个许可
public void release(int permits) { }  //释放permits个许可

Diese vier Methoden werden blockiert. Wenn Sie das Ausführungsergebnis sofort erhalten möchten, können Sie die folgenden Methoden verwenden:

public boolean tryAcquire() { };  //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

另外还可以通过availablePermits()方法得到可用的许可数目。

下面通过一个例子来看一下Semaphore的具体使用:

假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:


/**
 * Java学习交流QQ群:589809992 我们一起学Java!
 */
public class Test {
  public static void main(String[] args) {
    int N = 8;      //工人数
    Semaphore semaphore = new Semaphore(5); //机器数目
    for(int i=0;i<N;i++)
      new Worker(i,semaphore).start();
  }

  static class Worker extends Thread{
    private int num;
    private Semaphore semaphore;
    public Worker(int num,Semaphore semaphore){
      this.num = num;
      this.semaphore = semaphore;
    }

    @Override
    public void run() {
      try {
        semaphore.acquire();
        System.out.println("工人"+this.num+"占用一个机器在生产...");
        Thread.sleep(2000);
        System.out.println("工人"+this.num+"释放出机器");
        semaphore.release();      
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

执行结果:


工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产...
工人7占用一个机器在生产...
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产...
工人3释放出机器
工人7释放出机器
工人6释放出机器

下面对上面说的三个辅助类进行一个总结:

1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行; 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行; 另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

2)Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

Das obige ist der detaillierte Inhalt vonBeispiele für Java-Parallelität CountDownLatch, CyclicBarrier und Semaphore. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn