Der Ausführende kann Aufgabenübermittlungs- und Aufgabenausführungsstrategien entkoppeln.
Nur Aufgaben sind vom gleichen Typ und haben unterschiedliche Ausführungszeiten. Nur wenn der Thread-Pool vorhanden ist Ist klein, kann die maximale Leistung erreicht werden, wenn einige zeitaufwändige Aufgaben und kurzzeitaufwendige Aufgaben in einem Thread-Pool abgelegt werden, es sei denn, der Thread-Pool ist sehr groß, was zu Deadlocks und anderen Problemen führt
Ähnlich wie: Senden Sie zwei Aufgaben an einen Single-Thread-Pool, und die beiden Aufgaben sind voneinander abhängig. Wenn eine Aufgabe auf die andere Aufgabe wartet, tritt ein Deadlock auf ist, dass der Pool nicht ausreicht
Definition: Eine Aufgabe muss auf die laufenden Ergebnisse anderer Aufgaben im Pool warten, und es kann zu einem Hunger-Deadlock kommen
Hinweis: Die Größe des Thread-Pools unterliegt auch anderen Einschränkungen, z. B. anderen Ressourcenpools: Datenbankverbindungspool
Wenn jede Aufgabe eine Verbindung ist, dann Die Größe des Thread-Pools hängt von der Größe des Datenbankverbindungspools
Instanz:
1. Geben Sie einen Standardwert zurück Implementierungen über die Factory-Methode von Executors
2. Passen Sie die Implementierung der
-WarteschlangeThread-Pools an, indem Sie ThreadPoolExecutor( .....) 1. Unbegrenzte Warteschlange: Wenn die Aufgabe eintrifft und der Thread-Pool voll ist, wartet die Aufgabe in der Warteschlange. Wenn die Aufgabe unendlich reicht, wird die Warteschlange unendlich erweitert
Zum Beispiel: Dies verwenden Singletons und Thread-Pools fester Größe
2. Begrenzte Warteschlange: Wenn eine neue Aufgabe eintrifft und die Warteschlange voll ist, verwenden Sie die Sättigungsstrategie
3. Synchrone Übergabe: Wenn der Thread-Pool groß ist, wird die Aufgabe in die Warteschlange gestellt. Wenn der Aufgabenerzeuger sehr schnell ist, kommt es zu einer Verzögerung in die Warteschlange gestellt werden
SynchronousQueue übergibt die Aufgabe direkt an den Arbeitsthread
Mechanismus: Wenn eine Aufgabe platziert wird, muss ein Thread darauf warten, sie anzunehmen . Wenn nicht, einen Thread hinzufügen,
. Beispiel:CacheThreadPool verwendet diese Strategie
Sättigungsstrategie:
setRejectedExecutionHandl er zum Ändern der Sättigungsstrategie
1 . BeendenAbbrechen (Standard): eine Ausnahme auslösen vom Anrufer behandelt
2. AbbrechenVerwerfen
3. VerwerfenDiscardOldest: Verwerfen Sie die älteste Aufgabe. Hinweis: Wenn sie Priorität hat, verwirft die Warteschlange die Aufgabe mit der höchsten Priorität
4.Calle erRuns: Rollback-Aufgabe, der aufrufende Thread erledigt sie selbst
4. Thread-Fabrik ThreadFactoy Wann immer Beim Erstellen eines Threads: Tatsächlich wird die Thread-Fabrik aufgerufen vollständig Benutzerdefinierte Thread-Factory: implementiert ThreadFactorySie können dasVerhalten der Thread-Factory anpassen: z. B. Uncaught ExceptionHandler usw.
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
timeException ausgelöst wird, wird die Methode nicht ausgeführt
2. 3.beendet: Wenn der Thread-Pool geschlossen ist, kann er zum Freigeben von Ressourcen usw. verwendet werden. 2. Parallelisierung des1, jede Schleifenoperation ist unabhängig
//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2. Iteration
Wenn jede Iterationsoperation unabhängig voneinander ist, kann sie seriell ausgeführt werden
Zum Beispiel: Tiefensuche Algorithmus; Hinweis: Die Rekursion ist immer noch seriell, die Berechnung jedes Knotens erfolgt jedoch parallel
//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
Das obige ist der detaillierte Inhalt vonVerwendung des Java-Thread-Pools. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!