Heim  >  Artikel  >  Java  >  Verwendung des Java-Thread-Pools

Verwendung des Java-Thread-Pools

大家讲道理
大家讲道理Original
2017-05-28 11:33:171566Durchsuche

1. Implizite Kopplung zwischen Aufgaben und Ausführungsstrategien

Der Ausführende kann Aufgabenübermittlungs- und Aufgabenausführungsstrategien entkoppeln.

Nur Aufgaben sind vom gleichen Typ und haben unterschiedliche Ausführungszeiten. Nur wenn der Thread-Pool vorhanden ist Ist klein, kann die maximale Leistung erreicht werden, wenn einige zeitaufwändige Aufgaben und kurzzeitaufwendige Aufgaben in einem Thread-Pool abgelegt werden, es sei denn, der Thread-Pool ist sehr groß, was zu Deadlocks und anderen Problemen führt

1. Thread-Hunger-Deadlock

Ähnlich wie: Senden Sie zwei Aufgaben an einen Single-Thread-Pool, und die beiden Aufgaben sind voneinander abhängig. Wenn eine Aufgabe auf die andere Aufgabe wartet, tritt ein Deadlock auf ist, dass der Pool nicht ausreicht

Definition: Eine Aufgabe muss auf die laufenden Ergebnisse anderer Aufgaben im Pool warten, und es kann zu einem Hunger-Deadlock kommen

2 Thread-Poolgröße

 

Hinweis: Die Größe des Thread-Pools unterliegt auch anderen Einschränkungen, z. B. anderen Ressourcenpools: Datenbankverbindungspool

Wenn jede Aufgabe eine Verbindung ist, dann Die Größe des Thread-Pools hängt von der Größe des Datenbankverbindungspools

3 ab. Konfigurieren Sie den ThreadPoolExecutor-Thread-Pool

Instanz:

1. Geben Sie einen Standardwert zurück Implementierungen über die Factory-Methode von Executors

2. Passen Sie die Implementierung der

-Warteschlange

des

Thread-Pools an, indem Sie ThreadPoolExecutor( .....) 1. Unbegrenzte Warteschlange: Wenn die Aufgabe eintrifft und der Thread-Pool voll ist, wartet die Aufgabe in der Warteschlange. Wenn die Aufgabe unendlich reicht, wird die Warteschlange unendlich erweitert

Zum Beispiel: Dies verwenden Singletons und Thread-Pools fester Größe

2. Begrenzte Warteschlange: Wenn eine neue Aufgabe eintrifft und die Warteschlange voll ist, verwenden Sie die Sättigungsstrategie

3. Synchrone Übergabe: Wenn der Thread-Pool groß ist, wird die Aufgabe in die Warteschlange gestellt. Wenn der Aufgabenerzeuger sehr schnell ist, kommt es zu einer Verzögerung in die Warteschlange gestellt werden

 SynchronousQueue übergibt die Aufgabe direkt an den Arbeitsthread

Mechanismus: Wenn eine Aufgabe platziert wird, muss ein Thread darauf warten, sie anzunehmen . Wenn nicht, einen Thread hinzufügen,

. Beispiel:

CacheThreadPool verwendet diese Strategie

Sättigungsstrategie:

 

setRejectedExecutionHandl er zum Ändern der Sättigungsstrategie

1 . Beenden

Abbrechen (Standard): eine Ausnahme auslösen vom Anrufer behandelt

2. Abbrechen

Verwerfen

3. Verwerfen

DiscardOldest: Verwerfen Sie die älteste Aufgabe. Hinweis: Wenn sie Priorität hat, verwirft die Warteschlange die Aufgabe mit der höchsten Priorität

 4.

Calle erRuns: Rollback-Aufgabe, der aufrufende Thread erledigt sie selbst

4. Thread-Fabrik ThreadFactoy

Wann immer Beim Erstellen eines Threads: Tatsächlich wird die Thread-Fabrik aufgerufen vollständig

Benutzerdefinierte Thread-Factory: implementiert ThreadFactory

Sie können das

Verhalten der Thread-Factory anpassen: z. B. Uncaught ExceptionHandler usw.

 


public class MyAppThread extends Thread {    public static final String DEFAULT_NAME = "MyAppThread";    private static volatile boolean debugLifecycle = false;    private static final AtomicInteger created = new AtomicInteger();    private static final AtomicInteger alive = new AtomicInteger();    private static final Logger log = Logger.getAnonymousLogger();    public MyAppThread(Runnable r) {        this(r, DEFAULT_NAME);
    }    public MyAppThread(Runnable runnable, String name) {        super(runnable, name + "-" + created.incrementAndGet());        //设置该线程工厂创建的线程的 未捕获异常的行为
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }    public void run() {        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;        if (debug) log.log(Level.FINE, "Created " + getName());        try {
            alive.incrementAndGet();            super.run();
        } finally {
            alive.decrementAndGet();            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }    public static int getThreadsCreated() {        return created.get();
    }    public static int getThreadsAlive() {        return alive.get();
    }    public static boolean getDebug() {        return debugLifecycle;
    }    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}


5. Extend ThreadPoolExecutor

Methoden, die überschrieben werden können durch benutzerdefinierte Unterklassen:

1.afterExecute: Nach dem Ende, wenn Run

timeException ausgelöst wird, wird die Methode nicht ausgeführt

 2.

beforeAusführen: Wenn vor dem Start eine RuntimeException ausgelöst wird, wird die Aufgabe nicht ausgeführt

3.beendet: Wenn der Thread-Pool geschlossen ist, kann er zum Freigeben von Ressourcen usw. verwendet werden.

2. Parallelisierung des

rekursiven Algorithmus

1, jede Schleifenoperation ist unabhängig


//串行化
    void processSequentially(List<Element> elements) {        for (Element e : elements)
            process(e);
    }    //并行化
    void processInParallel(Executor exec, List<Element> elements) {        for (final Element e : elements)
            exec.execute(new Runnable() {                public void run() {
                    process(e);
                }
            });
    }

2. Iteration

Wenn jede Iterationsoperation unabhängig voneinander ist, kann sie seriell ausgeführt werden

Zum Beispiel: Tiefensuche Algorithmus; Hinweis: Die Rekursion ist immer noch seriell, die Berechnung jedes Knotens erfolgt jedoch parallel

 


//串行 计算compute 和串行迭代
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }    //并行 计算compute 和串行迭代
    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {        for (final Node<T> n : nodes) {
            exec.execute(() -> results.add(n.compute()));
            parallelRecursive(exec, n.getChildren(), results);
        }
    }    //调用并行方法的操作
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);        return resultQueue;
    }


 

  实例:

  


public class ConcurrentPuzzleSolver <P, M> {    private final Puzzle<P, M> puzzle;    private final ExecutorService exec;    private final ConcurrentMap<P, Boolean> seen;    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {        this.puzzle = puzzle;        this.exec = initThreadPool();        this.seen = new ConcurrentHashMap<P, Boolean>();        if (exec instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }    private ExecutorService initThreadPool() {        return Executors.newCachedThreadPool();
    }    public List<M> solve() throws InterruptedException {        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));            // 等待ValueLatch中闭锁解开,则表示已经找到答案
            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
            exec.shutdown();//最终主线程关闭线程池        }
    }    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {        return new SolverTask(p, m, n);
    }    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {            super(pos, move, prev);
        }        public void run() {            //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;            //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
            if (solution.isSet() || seen.putIfAbsent(pos, true) != null){                return; // already solved or seen this position            }            if (puzzle.isGoal(pos)) {
                solution.setValue(this);
            } else {                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos, m), m, this));
            }
        }
    }
}


 

  

 

Das obige ist der detaillierte Inhalt vonVerwendung des Java-Thread-Pools. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:Java NIO: I/O-ModellNächster Artikel:Java NIO: I/O-Modell