Heim >Java >javaLernprogramm >Java-Thread-Pool, der Produktionsblockierung unterstützt
Im Allgemeinen ist die Geschwindigkeit der Produktionsaufgaben größer als die Geschwindigkeit des Verbrauchs. Ein Detail ist die Länge der Warteschlange und die Abstimmung der Produktions- und Verbrauchsgeschwindigkeit.
Ein typisches Producer-Consumer-Modell sieht wie folgt aus:
Die Verwendung der von J.U.C bereitgestellten Queue-Implementierung in einer gleichzeitigen Umgebung kann Produktion und Konsum von Threads problemlos sicherstellen Sicherheit im Prozess. Hierbei ist zu beachten, dass die Warteschlange die Anfangskapazität festlegen muss, um zu verhindern, dass der Produzent zu schnell produziert, was zu einem sprunghaften Anstieg der Warteschlangenlänge und schließlich zu OutOfMemory führt.
Für die allgemeine Situation, in der die Produktion schneller ist als der Verbrauch. Wenn die Warteschlange voll ist, möchten wir nicht, dass Aufgaben ignoriert oder nicht ausgeführt werden. Zu diesem Zeitpunkt kann der Produzent eine Weile warten, bevor er die Aufgabe sendet. Ein besserer Ansatz besteht darin, den Produzenten in der Methode zum Senden der Aufgabe zu blockieren , und warten Sie, bis die Aufgabe übermittelt wird, wenn die Warteschlange nicht voll ist, damit keine Leerlaufzeit verschwendet wird. Das Blockieren ist ebenfalls sehr einfach. Sowohl ArrayBlockingQueue als auch LinkedBlockingQueue können Kapazitätsgrenzen festlegen, wenn die Warteschlange tatsächlich ausgeführt wird.
Darüber hinaus kann der Verbraucher die Aufgabe nicht erhalten, wenn die Warteschlange leer ist, und kann eine Weile warten, bevor er sie erhält. Ein besserer Ansatz besteht darin, die Take-Methode von BlockingQueue zum Blockieren und Warten zu verwenden Eine Aufgabe kann sofort ausgeführt werden. Um die Ausführung zu erhalten, wird empfohlen, die überladene Methode von take mit einem Timeout-Parameter aufzurufen. Der Thread wird nach dem Timeout beendet. Auf diese Weise muss der Verbraucher nicht endlos warten, wenn der Produzent die Produktion tatsächlich eingestellt hat.
So wird ein effizientes Produktions- und Konsummodell implementiert, das das Blockieren unterstützt.
Moment mal, da J.U.C uns bei der Implementierung des Thread-Pools geholfen hat, warum müssen wir diese Dinge immer noch verwenden? Ist es nicht bequemer, ExecutorService direkt zu verwenden?
Werfen wir einen Blick auf die Grundstruktur von ThreadPoolExecutor:
Wie Sie sehen können, wurden in ThreadPoolExecutor die BlockingQueue- und Consumer-Teile für uns implementiert Die direkte Verwendung der Thread-Pool-Implementierung bietet viele Vorteile, z. B. die dynamische Anpassung der Anzahl der Threads.
Aber das Problem besteht darin, dass selbst wenn Sie beim Erstellen von ThreadPoolExecutor manuell eine BlockingQueue als Warteschlangenimplementierung angeben, die Ausführungsmethode tatsächlich nicht blockiert, wenn die Warteschlange voll ist, weil ThreadPoolExecutor Die nicht blockierende Angebotsmethode von BlockingQueue heißt:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
Zu diesem Zeitpunkt müssen Sie etwas tun, um ein Ergebnis zu erzielen: Wenn der Produzent die Aufgabe übermittelt und die Warteschlange voll ist, kann der Produzent dies tun blockiert werden. Warten Sie, bis die Aufgabe verbraucht ist.
Der Schlüssel liegt darin, dass der Produzent in einer gleichzeitigen Umgebung nicht beurteilen kann, ob die Warteschlange voll ist, und ThreadPoolExecutor.getQueue().size() nicht aufgerufen werden kann, um zu beurteilen, ob die Warteschlange voll ist.
Wenn in der Implementierung des Thread-Pools die Warteschlange voll ist, wird der während der Erstellung übergebene RejectedExecutionHandler aufgerufen, um die Verarbeitung der Aufgabe abzulehnen. Die Standardimplementierung ist AbortPolicy, die direkt eine RejectedExecutionException auslöst.
Mehrere Ablehnungsstrategien werden hier nicht beschrieben. Diejenige, die unseren Anforderungen am nächsten kommt, ist CallerRunsPolicy. Diese Strategie ermöglicht es dem Thread, der die Aufgabe gesendet hat, die Aufgabe auszuführen, wenn die Warteschlange voll ist Produktion zulassen Der Produzent erledigt vorübergehend die Arbeit des Verbrauchers, sodass der Produzent zwar nicht blockiert ist, die übermittelte Aufgabe jedoch ebenfalls ausgesetzt wird.
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a <tt>CallerRunsPolicy</tt>. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
Diese Strategie birgt jedoch auch versteckte Gefahren. Wenn es nur wenige Produzenten gibt, kann es sein, dass der Konsument während der Zeit, in der der Produzent Aufgaben verbraucht, alle Aufgaben abgeschlossen hat und die Warteschlange in einem leeren Zustand bleibt . Wenn der Produzent die Produktionsaufgabe ausführt, kann er erst nach Abschluss der Aufgabe fortgesetzt werden. Dieser Prozess kann zum Verhungern des Verbraucherthreads führen.
Unter Bezugnahme auf eine ähnliche Idee besteht der einfachste Weg darin, direkt einen RejectedExecutionHandler zu definieren und, wenn die Warteschlange voll ist, BlockingQueue.put aufzurufen, um die Produzentenblockierung zu implementieren:
new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };
Auf diese Weise , Wir müssen uns nicht mehr um die Logik von Warteschlange und Verbraucher kümmern, sondern müssen uns nur noch auf die Implementierungslogik von Produzenten- und Verbraucher-Threads konzentrieren und einfach Aufgaben an den Thread-Pool senden.
Im Vergleich zum Originaldesign kann diese Methode die Codemenge erheblich reduzieren und viele Probleme in gleichzeitigen Umgebungen vermeiden. Natürlich können Sie auch andere Methoden verwenden, z. B. die Verwendung von Semaphoren, um den Eintrag beim Senden einzuschränken. Wenn Sie jedoch nur möchten, dass der Produzent blockiert, wird es kompliziert.
Weitere Artikel zu Java-Thread-Pools, die das Blockieren der Produktion unterstützen, finden Sie auf der chinesischen PHP-Website!