Maison  >  Article  >  Java  >  Explication détaillée de la façon d'utiliser le pool de threads dans la programmation simultanée Java

Explication détaillée de la façon d'utiliser le pool de threads dans la programmation simultanée Java

黄舟
黄舟original
2017-05-28 09:25:011496parcourir

L'éditeur suivant vous proposera un article sur la concurrence JavaProgrammation_Comment utiliser le pool de threads (explication détaillée). L'éditeur pense que c'est plutôt bien, alors je vais le partager avec vous maintenant et le donner comme référence. Suivons l'éditeur pour y jeter un œil

1. Le couplage implicite entre les tâches et les stratégies d'exécution

L'exécuteur peut soumettre des tâches découplées de l'exécution stratégie de la tâche

Seules les tâches du même type avec peu de différence dans le temps d'exécution peuvent atteindre des performances maximales, sinon, par exemple, mettre des tâches longues et courtes dans le même pool, à moins que le pool de threads ne soit très volumineux, cela entraînera des problèmes tels qu'un blocage

1. Un blocage par manque de threads

Similaire à : Combinez les deux. Une tâche est soumise à un pool monothread, et les deux tâches dépendent l'une de l'autre. Si une tâche attend une autre tâche, un blocage se produira ; la performance est que le pool n'est pas suffisant

Définition. : une tâche doit attendre le pool Suite à l'exécution d'autres tâches, une impasse de famine peut se produire

2. Taille du pool de threads

Remarque : La taille du pool de threads est également soumise à d'autres restrictions, telles que d'autres pools de ressources : pool de connexions à la base de données

Si chaque tâche est une connexion, alors la taille du le pool de threads est limité par la taille du pool de connexions à la base de données

3. Configurez le pool de threads ThreadPoolExecutor

Instance :

1. Retour via la méthode d'usine des exécuteurs Quelques implémentations par défaut

2. Personnalisez l'implémentation en instanciant ThreadPoolExecutor(.....) >

. 1. File d'attente illimitée : Lorsque la tâche arrive et que le pool de threads est plein, la tâche attend dans la file d'attente. Si la tâche atteint l'infini, la file d'attente s'étendra à l'infini

Par exemple : ceci. est utilisé pour les singletons et les pools de threads de taille fixe

2 File d'attente limitée :

Si une nouvelle tâche arrive et que la file d'attente est pleine, utilisez

Stratégie de saturation.

3. Transfert synchrone : Si le pool de threads est grand, il y aura un délai de transfert après avoir mis la tâche dans la file d'attente. le producteur de tâches provoque également bientôt la mise en file d'attente de la tâche

Syn

chronousQueue remet directement la tâche au thread de travailMécanisme : pour mettre une tâche dedans, il doit y avoir a Le thread attend l'acceptation. Sinon, ajoute un

Si le thread est saturé, rejette la tâche

Par exemple : Cache

ThreadPool est la stratégie. utilisé

Stratégie de saturation :

set

RejectedExecutionHan

dler pour modifier la stratégie de saturation

1. Terminer l'abandon (par défaut) : Lève une exception Gérée par l'appelant

2. Rejeter Rejeter 3. Supprimer la plus ancienne : Abandonnez la tâche la plus ancienne. Remarque : Si elle est

prioritaire

, la file d'attente supprimera la tâche la plus prioritaire

4.Ctoutes. erRuns : tâche de restauration, le thread appelant la gère tout seul

4. Thread factory ThreadFactoy

Chaque fois qu'un thread est créé Quand : appelle réellement la fabrique de threads pour terminer Fabrique de threads personnalisée :

implémente ThreadFactory

Vous pouvez personnaliser la fabrique de threads

Comportement

 :tels que UncaughtException

Gestionnaire, etc.

5. Étendre ThreadPoolExecutor

public class MyAppThread extends Thread {
  public static final String DEFAULT_NAME = "MyAppThread";
  private static volatile boolean debugLifecycle = false;
  private static final AtomicInteger created = new AtomicInteger();
  private static final AtomicInteger alive = new AtomicInteger();
  private static final Logger log = Logger.getAnonymousLogger();

  public MyAppThread(Runnable r) {
    this(r, DEFAULT_NAME);
  }

  public MyAppThread(Runnable runnable, String name) {
    super(runnable, name + "-" + created.incrementAndGet());
    //设置该线程工厂创建的线程的 未捕获异常的行为
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
      public void uncaughtException(Thread t,
                     Throwable e) {
        log.log(Level.SEVERE,
            "UNCAUGHT in thread " + t.getName(), e);
      }
    });
  }

  public void run() {
    // Copy debug flag to ensure consistent value throughout.
    boolean debug = debugLifecycle;
    if (debug) log.log(Level.FINE, "Created " + getName());
    try {
      alive.incrementAndGet();
      super.run();
    } finally {
      alive.decrementAndGet();
      if (debug) log.log(Level.FINE, "Exiting " + getName());
    }
  }

  public static int getThreadsCreated() {
    return created.get();
  }

  public static int getThreadsAlive() {
    return alive.get();
  }

  public static boolean getDebug() {
    return debugLifecycle;
  }

  public static void setDebug(boolean b) {
    debugLifecycle = b;
  }
}
Méthodes qui peuvent être remplacées par des sous-classes personnalisées :

1.afterExecute : Après la fin, si une exception Runtimeest levée, la méthode ne sera pas exécutée

2.

before

Execute : Avant de démarrer, si une RuntimeException est levée, la tâche ne sera pas exécutée

3.terminated : Lorsque le pool de threads est fermé, il peut être utilisé pour libérer des ressources, etc.

2. Parallélisation de l'algorithme

récursif

1.循环

在循环中,每次循环操作都是独立的

//串行化
  void processSequentially(List<Element> elements) {
    for (Element e : elements)
      process(e);
  }
  //并行化
  void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
      exec.execute(new Runnable() {
        public void run() {
          process(e);
        }
      });
  }

2.迭代

如果每个迭代操作是彼此独立的,则可以串行执行

如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

//串行 计算compute 和串行迭代
  public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
    for (Node<T> n : nodes) {
      results.add(n.compute());
      sequentialRecursive(n.getChildren(), results);
    }
  }
  //并行 计算compute 和串行迭代
  public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
    for (final Node<T> n : nodes) {
      exec.execute(() -> results.add(n.compute()));
      parallelRecursive(exec, n.getChildren(), results);
    }
  }
  //调用并行方法的操作
  public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
      throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
  }

实例:

public class ConcurrentPuzzleSolver <P, M> {
  private final Puzzle<P, M> puzzle;
  private final ExecutorService exec;
  private final ConcurrentMap<P, Boolean> seen;
  protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

  public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
    this.puzzle = puzzle;
    this.exec = initThreadPool();
    this.seen = new ConcurrentHashMap<P, Boolean>();
    if (exec instanceof ThreadPoolExecutor) {
      ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
      tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    }
  }

  private ExecutorService initThreadPool() {
    return Executors.newCachedThreadPool();
  }

  public List<M> solve() throws InterruptedException {
    try {
      P p = puzzle.initialPosition();
      exec.execute(newTask(p, null, null));
      // 等待ValueLatch中闭锁解开,则表示已经找到答案
      PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
      return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
    } finally {
      exec.shutdown();//最终主线程关闭线程池
    }
  }

  protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
    return new SolverTask(p, m, n);
  }

  protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
    SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
      super(pos, move, prev);
    }
    public void run() {
      //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
      //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
      if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
        return; // already solved or seen this position
      }
      if (puzzle.isGoal(pos)) {
        solution.setValue(this);
      } else {
        for (M m : puzzle.legalMoves(pos))
          exec.execute(newTask(puzzle.move(pos, m), m, this));
      }
    }
  }
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn