Maison  >  Article  >  Java  >  Utilisation du pool de threads Java

Utilisation du pool de threads Java

大家讲道理
大家讲道理original
2017-05-28 11:33:171527parcourir

1. Couplage implicite entre les tâches et les stratégies d'exécution

L'exécuteur peut découpler les stratégies de soumission et d'exécution des tâches

Seules les tâches sont du même type et ont des temps d'exécution différents Uniquement lorsque le pool de threads est petit, les performances maximales peuvent être atteintes. Sinon, si certaines tâches longues et courtes sont placées dans un pool de threads, à moins que le pool de threads ne soit très grand, cela provoquera des blocages et d'autres problèmes

1. Blocage par manque de threads

Similaire à : Soumettez deux tâches à un pool à un seul thread, et les deux tâches dépendent l'une de l'autre. Si une tâche attend une autre tâche, une impasse se produira ; que le pool n'est pas suffisant

Définition : une tâche doit attendre les résultats en cours d'exécution d'autres tâches dans le pool, et une impasse de famine peut se produire

Taille du pool de threads

 

Remarque : La taille du pool de threads est également soumise à d'autres restrictions, comme d'autres pools de ressources : pool de connexion à la base de données

Si chaque tâche est une connexion, alors la taille du pool de threads est soumis à la taille du pool de connexion à la base de données

3. Configurer le pool de threads ThreadPoolExecutor

instance :

1. Renvoie certaines implémentations par défaut. via la méthode d'usine des exécuteurs

2. Personnalisez l'implémentation de la file d'attente

du pool de threads en instanciant ThreadPoolExecutor(. ....) 1. File d'attente illimitée : lorsque la tâche arrive et que le pool de threads est plein, la tâche attend dans la file d'attente. Si la tâche atteint l'infini, la file d'attente s'étendra à l'infini

Par exemple : ceci est. ce qu'utilisent les singletons et les pools de threads de taille fixe

2. File d'attente limitée : si une nouvelle tâche arrive et que la file d'attente est pleine, utilisez la

stratégie de saturation 3. Transfert synchrone : si le pool de threads est volumineux, placez la tâche. Après avoir été mise dans la file d'attente, il y aura un retard dans le transfert. Si le producteur de tâches est très rapide, cela entraînera également l'exécution de la tâche. queued

 Syn

chr

onousQueue remettra directement la tâche au thread de travail Mécanisme : Lorsqu'une tâche est placée, il doit y avoir un thread en attente pour l'accepter. Sinon, alors

ajoutez un

thread si le thread est saturé, rejetez la tâche Par exemple :

Cache

ThreadPool utilise cette stratégie.

Stratégie de saturation : 

set

RejectedExecutionHandl euh pour modifier la stratégie de saturation 1. Terminer

Abandonner

(par défaut) : lancer une exception gérée par l'appelant 2. Abandonner

Rejeter

3. Rejeter

DiscardOldest

 : supprimez la tâche la plus ancienne, remarque : si elle est prioritaire la file d'attente supprimera la tâche la plus prioritaire  4.

C

touterRuns : tâche de rollback, le thread appelant la gère tout seul 4. Thread factory ThreadFactoy

À chaque fois Lors de la création d'un thread : En fait, la thread factory est appelée pour terminer.

Fabrique de threads personnalisée : implémente ThreadFactory

Vous pouvez personnaliser le

comportement

de la fabrique de threads : comme Uncaught ExceptionGestionnaire, etc. 


public class MyAppThread extends Thread {    public static final String DEFAULT_NAME = "MyAppThread";    private static volatile boolean debugLifecycle = false;    private static final AtomicInteger created = new AtomicInteger();    private static final AtomicInteger alive = new AtomicInteger();    private static final Logger log = Logger.getAnonymousLogger();    public MyAppThread(Runnable r) {        this(r, DEFAULT_NAME);
    }    public MyAppThread(Runnable runnable, String name) {        super(runnable, name + "-" + created.incrementAndGet());        //设置该线程工厂创建的线程的 未捕获异常的行为
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }    public void run() {        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;        if (debug) log.log(Level.FINE, "Created " + getName());        try {
            alive.incrementAndGet();            super.run();
        } finally {
            alive.decrementAndGet();            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }    public static int getThreadsCreated() {        return created.get();
    }    public static int getThreadsAlive() {        return alive.get();
    }    public static boolean getDebug() {        return debugLifecycle;
    }    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}


5. Étendre ThreadPoolExecutor

Méthodes qui peuvent être remplacées par sous-classes personnalisées :

1.afterExecute : Après la fin, si une exception Run

time

est levée, la méthode ne sera pas exécutée  2.

avant

Exécuter : Avant de démarrer, si une RuntimeException est levée, la tâche ne sera pas exécutée 3.terminée : Lorsque le pool de threads est fermé, il peut être utilisé pour libérer des ressources, etc.

2. Parallélisation de l'algorithme

récursif

1, chaque opération de boucle est indépendante

<.>


2. Itération
//串行化
    void processSequentially(List<Element> elements) {        for (Element e : elements)
            process(e);
    }    //并行化
    void processInParallel(Executor exec, List<Element> elements) {        for (final Element e : elements)
            exec.execute(new Runnable() {                public void run() {
                    process(e);
                }
            });
    }


Si chaque opération d'itération est indépendante les unes des autres, elle peut être exécutée en série

Par exemple : algorithme de

recherche

en profondeur d'abord ; remarque : la récursivité est toujours en série, Cependant, le calcul de chaque nœud est parallèle 


//串行 计算compute 和串行迭代
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }    //并行 计算compute 和串行迭代
    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {        for (final Node<T> n : nodes) {
            exec.execute(() -> results.add(n.compute()));
            parallelRecursive(exec, n.getChildren(), results);
        }
    }    //调用并行方法的操作
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);        return resultQueue;
    }


 

  实例:

  


public class ConcurrentPuzzleSolver <P, M> {    private final Puzzle<P, M> puzzle;    private final ExecutorService exec;    private final ConcurrentMap<P, Boolean> seen;    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {        this.puzzle = puzzle;        this.exec = initThreadPool();        this.seen = new ConcurrentHashMap<P, Boolean>();        if (exec instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }    private ExecutorService initThreadPool() {        return Executors.newCachedThreadPool();
    }    public List<M> solve() throws InterruptedException {        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));            // 等待ValueLatch中闭锁解开,则表示已经找到答案
            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
            exec.shutdown();//最终主线程关闭线程池        }
    }    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {        return new SolverTask(p, m, n);
    }    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {            super(pos, move, prev);
        }        public void run() {            //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;            //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
            if (solution.isSet() || seen.putIfAbsent(pos, true) != null){                return; // already solved or seen this position            }            if (puzzle.isGoal(pos)) {
                solution.setValue(this);
            } else {                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos, m), m, this));
            }
        }
    }
}


 

  

 

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Article précédent:Java NIO : modèle d'E/SArticle suivant:Java NIO : modèle d'E/S