Heim  >  Artikel  >  Java  >  Detaillierte Erläuterung der Verwendung des Thread-Pools in der gleichzeitigen Java-Programmierung

Detaillierte Erläuterung der Verwendung des Thread-Pools in der gleichzeitigen Java-Programmierung

黄舟
黄舟Original
2017-05-28 09:25:011490Durchsuche

Der folgende Editor bringt Ihnen einen Artikel über Java-ParallelitätProgrammierung_So verwenden Sie den Thread-Pool (ausführliche Erklärung). Der Herausgeber findet es ziemlich gut, deshalb werde ich es jetzt mit Ihnen teilen und es allen als Referenz geben. Folgen wir dem Editor und werfen wir einen Blick darauf

1. Die implizite Kopplung zwischen Aufgaben und Ausführungsstrategien

Der Ausführende kann Aufgaben entkoppelt von der Ausführung einreichen Strategie der Aufgabe

Nur Aufgaben des gleichen Typs mit geringem Unterschied in der Ausführungszeit können die maximale Leistung erzielen. Andernfalls legen Sie beispielsweise einige langwierige Aufgaben und kurzwändige Aufgaben in denselben Thread. Sofern der Thread-Pool nicht sehr groß ist, führt dies zu Problemen wie einem Deadlock

1. Thread-Hunger-Deadlock

Ähnlich wie: Kombinieren Sie die beiden Eine Aufgabe wird an einen Single-Thread-Pool übermittelt und die beiden Aufgaben sind voneinander abhängig. Wenn eine Aufgabe auf eine andere Aufgabe wartet, kommt es zu einem Deadlock. Die Leistung des Pools reicht nicht aus : Eine Aufgabe muss auf den Pool warten. Durch die Ausführung anderer Aufgaben kann es zu einem Hunger-Deadlock kommen

2. Thread-Poolgröße

Hinweis: Die Größe des Thread-Pools unterliegt auch anderen Einschränkungen, z. B. anderen Ressourcenpools: Datenbankverbindungspool

Wenn jede Aufgabe eine Verbindung ist, dann ist die Größe der Der Thread-Pool ist durch die Größe des Datenbankverbindungspools begrenzt

3. Konfigurieren Sie den ThreadPoolExecutor-Thread-Pool

Instanz:

1. Rückkehr über die Factory-Methode von Executors. Einige Standardimplementierungen

2. Passen Sie die Implementierung an, indem Sie ThreadPoolExecutor(.....) instanziieren. >

1. Unbegrenzte Warteschlange:

Wenn die Aufgabe eintrifft und der Thread-Pool voll ist, wartet die Aufgabe in der Warteschlange. Wenn die Aufgabe unendlich reicht, wird die Warteschlange unendlich erweitertZum Beispiel: Dies wird für Singletons und Thread-Pools fester Größe verwendet

2. Begrenzte Warteschlange: Wenn eine neue Aufgabe eintrifft und die Warteschlange voll ist, verwenden Sie Sättigungsstrategie

3. Synchrone Übergabe: Wenn der Thread-Pool groß ist, kommt es zu einer Verzögerung bei der Übergabe, nachdem die Aufgabe in die Warteschlange gestellt wurde Der Aufgabenproduzent bewirkt bald auch, dass die Aufgabe in die Warteschlange gestellt wird Synchr

onousQueue übergibt die Aufgabe direkt an den Arbeitsthread

Mechanismus: Um eine Aufgabe einzufügen, muss es eine geben a Der Thread wartet auf die Annahme. fügt einen Thread hinzu. Wenn der Thread gesättigt ist, lehnt er die Aufgabe ab. Cache

ThreadPool ist die Strategie verwendet

Sättigungsstrategie:

set

RejectedExecutionHan

dler, um die Sättigungsstrategie

1. Abbruch beenden (Standard):

Wirft eine Ausnahme aus Vom Anrufer behandelt

2. Verwerfen Verwerfen

3. Älteste verwerfen: Verwerfen Sie die älteste Aufgabe. Hinweis: Wenn sie

Priorität hat, verwirft die Warteschlange die Aufgabe mit der höchsten Priorität

4.C

alle erRuns: Rollback-Aufgabe, der Aufrufer-Thread erledigt sie selbst

4. Thread-Fabrik ThreadFactoy

Immer wenn ein Thread erstellt wird. Wenn: die Thread-Factory tatsächlich zum Abschließen aufruft

Benutzerdefinierte Thread-Factory:

ThreadFactory implementiertSie können die Thread-Factory anpassen Verhalten

:

wie UncaughtAusnahmeHandler usw.

5. Erweitern Sie ThreadPoolExecutor

Methoden, die von benutzerdefinierten Unterklassen überschrieben werden können: 1.afterExecute: Wenn nach dem Ende eine Runtime

Ausnahme ausgelöst wird, wird die Methode nicht ausgeführt
public class MyAppThread extends Thread {
  public static final String DEFAULT_NAME = "MyAppThread";
  private static volatile boolean debugLifecycle = false;
  private static final AtomicInteger created = new AtomicInteger();
  private static final AtomicInteger alive = new AtomicInteger();
  private static final Logger log = Logger.getAnonymousLogger();

  public MyAppThread(Runnable r) {
    this(r, DEFAULT_NAME);
  }

  public MyAppThread(Runnable runnable, String name) {
    super(runnable, name + "-" + created.incrementAndGet());
    //设置该线程工厂创建的线程的 未捕获异常的行为
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
      public void uncaughtException(Thread t,
                     Throwable e) {
        log.log(Level.SEVERE,
            "UNCAUGHT in thread " + t.getName(), e);
      }
    });
  }

  public void run() {
    // Copy debug flag to ensure consistent value throughout.
    boolean debug = debugLifecycle;
    if (debug) log.log(Level.FINE, "Created " + getName());
    try {
      alive.incrementAndGet();
      super.run();
    } finally {
      alive.decrementAndGet();
      if (debug) log.log(Level.FINE, "Exiting " + getName());
    }
  }

  public static int getThreadsCreated() {
    return created.get();
  }

  public static int getThreadsAlive() {
    return alive.get();
  }

  public static boolean getDebug() {
    return debugLifecycle;
  }

  public static void setDebug(boolean b) {
    debugLifecycle = b;
  }
}

2.vorAusführen: Wenn vor dem Start eine RuntimeException ausgelöst wird, wird die Aufgabe nicht ausgeführt3.beendet: Wenn der Thread-Pool geschlossen ist, kann dies geschehen zur Freigabe von Ressourcen usw. verwendet werden.

2

1.循环

在循环中,每次循环操作都是独立的

//串行化
  void processSequentially(List<Element> elements) {
    for (Element e : elements)
      process(e);
  }
  //并行化
  void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
      exec.execute(new Runnable() {
        public void run() {
          process(e);
        }
      });
  }

2.迭代

如果每个迭代操作是彼此独立的,则可以串行执行

如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

//串行 计算compute 和串行迭代
  public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
    for (Node<T> n : nodes) {
      results.add(n.compute());
      sequentialRecursive(n.getChildren(), results);
    }
  }
  //并行 计算compute 和串行迭代
  public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
    for (final Node<T> n : nodes) {
      exec.execute(() -> results.add(n.compute()));
      parallelRecursive(exec, n.getChildren(), results);
    }
  }
  //调用并行方法的操作
  public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
      throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
  }

实例:

public class ConcurrentPuzzleSolver <P, M> {
  private final Puzzle<P, M> puzzle;
  private final ExecutorService exec;
  private final ConcurrentMap<P, Boolean> seen;
  protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

  public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
    this.puzzle = puzzle;
    this.exec = initThreadPool();
    this.seen = new ConcurrentHashMap<P, Boolean>();
    if (exec instanceof ThreadPoolExecutor) {
      ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
      tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    }
  }

  private ExecutorService initThreadPool() {
    return Executors.newCachedThreadPool();
  }

  public List<M> solve() throws InterruptedException {
    try {
      P p = puzzle.initialPosition();
      exec.execute(newTask(p, null, null));
      // 等待ValueLatch中闭锁解开,则表示已经找到答案
      PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
      return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
    } finally {
      exec.shutdown();//最终主线程关闭线程池
    }
  }

  protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
    return new SolverTask(p, m, n);
  }

  protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
    SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
      super(pos, move, prev);
    }
    public void run() {
      //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
      //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
      if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
        return; // already solved or seen this position
      }
      if (puzzle.isGoal(pos)) {
        solution.setValue(this);
      } else {
        for (M m : puzzle.legalMoves(pos))
          exec.execute(newTask(puzzle.move(pos, m), m, this));
      }
    }
  }
}

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung der Verwendung des Thread-Pools in der gleichzeitigen Java-Programmierung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn