Maison  >  Article  >  Java  >  Introduction détaillée à l'utilisation de ThreadPoolExecutor pour exécuter des tâches indépendantes monothread en parallèle en Java

Introduction détaillée à l'utilisation de ThreadPoolExecutor pour exécuter des tâches indépendantes monothread en parallèle en Java

黄舟
黄舟original
2017-03-23 11:06:442920parcourir

Le cadre d'exécution des tâches a été introduit dans Java SE 5.0, ce qui constitue un grand progrès dans la simplification du développement de programmation multithread. Utilisez ce framework pour gérer facilement les tâches : gérez le cycle de vie et la stratégie d'exécution de la tâche.

Dans cet article, nous utilisons un exemple simple pour montrer la flexibilité et la simplicité apportées par ce framework.

Basique

Le framework d'exécution introduit l'interface Executor pour gérer l'exécution des tâches. Executor est une interface utilisée pour soumettre des tâches Runnable. Cette interface isole la soumission des tâches de l'exécution des tâches : les exécuteurs avec des stratégies d'exécution différentes implémentent tous la même interface de soumission. Changer la stratégie d'exécution n'affectera pas la logique de soumission des tâches.

Si vous souhaitez soumettre un objet Runnable pour exécution, c'est très simple :

Executor exec = …;
exec.execute(runnable);

Pool de threads

Comme mentionné ci-dessus, comment l'exécuteur exécute-t-il l'objet soumis tâche exécutable et non spécifié dans l'interface de l'exécuteur, cela dépend du type spécifique d'exécuteur que vous utilisez. Ce framework fournit plusieurs exécuteurs différents et les stratégies d'exécution varient selon les scénarios.

Le type d'exécuteur le plus courant que vous pouvez utiliser est l'exécuteur de pool de threads, qui est une instance de la classe ThreadPoolExecutor (et de ses sous-classes). ThreadPoolExecutor gère un pool de threads et une file d'attente de travail . Le pool de threads stocke les threads de travail utilisés pour exécuter les tâches.

Vous devez avoir compris la notion de "pool" dans d'autres technologies. L'un des principaux avantages de l'utilisation d'un « pool » est de réduire le coût de création des ressources. Une fois utilisées et libérées, elles peuvent être réutilisées. Un autre avantage indirect est que vous pouvez contrôler la quantité de ressources utilisées. Par exemple, vous pouvez ajuster la taille du pool de threads pour obtenir la charge souhaitée sans endommager les ressources système.

Ce framework fournit une classe d'usine appelée Executors pour créer un pool de threads. À l’aide de cette classe d’ingénierie, vous pouvez créer des pools de threads avec différentes caractéristiques. Bien que l'implémentation sous-jacente soit souvent la même (ThreadPoolExecutor), les classes d'usine vous permettent de configurer rapidement un pool de threads sans utiliser de constructeurs complexes. Les méthodes d'usine de la classe d'ingénierie sont :

  • newFixedThreadPool : Cette méthode renvoie un pool de threads avec une capacité maximale fixe. Il crée de nouveaux threads à la demande et le nombre de threads n'est pas supérieur au nombre configuré. Lorsque le nombre de threads atteint le maximum, le pool de threads reste inchangé.

  • newCachedThreadPool : cette méthode renvoie un pool de threads illimité, c'est-à-dire qu'il n'y a pas de limite de nombre maximum. Mais lorsque la charge de travail diminue, ce type de pool de threads détruira les threads inutiles.

  • newSingleThreadedExecutor : Cette méthode renvoie un exécuteur, qui peut garantir que toutes les tâches sont exécutées dans un seul thread.

  • newScheduledThreadPool : Cette méthode renvoie un pool de threads de taille fixe, qui prend en charge l'exécution de tâches retardées et planifiées.

Ce n'est que le début. Il existe d'autres utilisations d'Executor qui dépassent le cadre de cet article. Je vous recommande fortement d'étudier les éléments suivants :

  • Les méthodes de gestion du cycle de vie, qui sont déclarées par l'interface ExecutorService (telles que comme shutdown() et waitTermination()).

  • Utilisez CompletionService pour interroger l'état de la tâche et obtenir la valeur de retour, s'il existe une valeur de retour.

L'interface ExecutorService est particulièrement importante car elle permet d'arrêter le pool de threads et de garantir que les ressources qui ne sont plus utilisées sont nettoyées. Heureusement, l'interface ExecutorService est assez simple et explicite, je recommande d'étudier sa documentation de manière approfondie.

En gros, lorsque vous envoyez un message shutdown() à ExecutorService, il ne recevra pas les tâches nouvellement soumises, mais les tâches toujours dans la file d'attente continueront d'être traitées. Vous pouvez utiliser isTerminate() pour interroger l'état de terminaison d'ExecutorService, ou utiliser la méthode waitTermination(...) pour attendre la fin d'ExecutorService. Si vous passez un délai d'attente maximum en paramètre, la méthode waitTermination n'attendra pas éternellement.

Avertissement : Il existe des erreurs et de la confusion dans la compréhension du fait que les processus JVM ne se termineront jamais. Si vous ne fermez pas executorService et détruisez simplement le thread sous-jacent, la JVM ne se fermera pas. Lorsque le dernier thread ordinaire (thread non démon) se termine, la JVM se ferme également.

Configuration de ThreadPoolExecutor

Si vous décidez de ne pas utiliser la classe d'usine Executor, mais de créer manuellement un ThreadPoolExecutor, vous devez utiliser le constructeur pour le créer et le configurer. Voici l'un des constructeurs les plus utilisés de cette classe :

public ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAlive,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler);

Comme vous pouvez le constater, vous pouvez configurer les éléments suivants :

  • La taille du noyau pool (threads La taille que le pool utilisera)

  • Taille maximale du pool

  • Le temps de survie après lequel les threads inactifs sont détruits

  • File d'attente de travail pour stocker les tâches

  • Stratégie à exécuter après le rejet de la soumission des tâches

限制队列中任务数

限制执行任务的并发数、限制线程池大小对应用程序以及程序执行结果的可预期性与稳定性有很大的好处。无尽地创建线程,最终会耗尽运行时资源。你的应用程序因此会产生严重的性能问题,甚至导致程序不稳定。

这只解决了部分问题:限制了并发任务数,但并没有限制提交到等待队列的任务数。如果任务提交的速率一直高于任务执行的速率,那么应用程序最终会出现资源短缺的状况。

解决方法是:

  • 为Executor提供一个存放待执行任务的阻塞队列。如果队列填满,以后提交的任务会被“拒绝”。

  • 当任务提交被拒绝时会触发RejectedExecutionHandler,这也是为什么这个类名中引用动词“rejected”。你可以实现自己的拒绝策略,或者使用框架内置的策略。

默认的拒绝策略可以让executor抛出一个RejectedExecutionException异常。然而,还有其他的内建策略:

  • 悄悄地丢弃一个任务

  • 丢弃最旧的任务,重新提交最新的

  • 在调用者的线程中执行被拒绝的任务

什么时候以及为什么我们才会这样配置线程池?让我们看一个例子。

示例:并行执行独立的单线程任务

最近,我被叫去解决一个很久以前的任务的问题,我的客户之前就运行过这个任务。大致来说,这个任务包含一个组件,这个组件监听目录树所产生的文件系统事件。每当一个事件被触发,必须处理一个文件。一个专门的单线程执行文件处理。说真的,根据任务的特点,即使我能把它并行化,我也不想那么做。一天的某些时候,事件到达率才很高,文件也没必要实时处理,在第二天之前处理完即可。

当前的实现采用了一些混合且匹配的技术,包括使用UNIX SHELL脚本扫描目录结构,并检测是否发生改变。实现完成后,我们采用了双核的执行环境。同样,事件的到达率相当低:目前为止,事件数以百万计,总共要处理1~2T字节的原始数据。

运行处理程序的主机是12核的机器:很好机会去并行化这些旧的单线程任务。基本上,我们有了食谱的所有原料,我们需要做的仅仅是把程序建立起来并调节。在写代码前,我们必须了解下程序的负载。我列一下我检测到的内容:

  • 有非常多的文件需要被周期性地扫描:每个目录包含1~2百万个文件

  • 扫描算法很快,可以并行化

  • 处理一个文件至少需要1s,甚至上升到2s或3s

  • 处理文件时,性能瓶颈主要是CPU

  • CPU利用率必须可调,根据一天时间的不同而使用不同的负载配置。

我需要这样一个线程池,它的大小在程序运行的时候通过负载配置来设置。我倾向于根据负载策略创建一个固定大小的线程池。由于线程的性能瓶颈在CPU,它的核心使用率是100%,不会等待其他资源,那么负载策略就很好计算了:用执行环境的CPU核心数乘以一个负载因子(保证计算的结果在峰值时至少有一个核心):

int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后我需要使用阻塞队列创建一个ThreadPoolExecutor,可以限制提交的任务数。为什么?是这样,扫描算法执行很快,很快就产生庞大数量需要处理的文件。数量有多庞大呢?很难预测,因为变动太大了。我不想让executor内部的队列不加选择地填满了要执行的任务实例(这些实例包含了庞大的文件描述符)。我宁愿在队列填满时,拒绝这些文件。

而且,我将使用ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略。为什么?因为当队列已满时,线程池的线程忙于处理文件,我让提交任务的线程去执行它(被拒绝的任务)。这样,扫面会停止,转而去处理一个文件,处理结束后马上又会扫描目录。

下面是创建executor的代码:

ExecutorService executorService =
    new ThreadPoolExecutor(
        maxThreads, // core thread pool size
        maxThreads, // maximum thread pool size
        1, // time to wait before resizing pool
        TimeUnit.MINUTES, 
        new ArrayBlockingQueue<Runnable>(maxThreads, true),
        new ThreadPoolExecutor.CallerRunsPolicy());

 下面是程序的框架(极其简化版):

// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {
    File currentDir = dirsToProcess.pop();

    // listing children
    File[] children = currentDir.listFiles();

    // processing children
    for (final File currentFile : children) {
        // if it&#39;s a directory, defer processing
        if (currentFile.isDirectory()) {
            dirsToProcess.add(currentFile);
            continue;
        }

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // if it&#39;s a file, process it
                    new ConvertTask(currentFile).perform();
                } catch (Exception ex) {
                    // error management logic
                }
            }
        });
    }
}

// ...
// wait for all of the executor threads to finish
executorService.shutdown();
try {
    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the first try
        executorService.shutdownNow();
    }

    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the second try
    }
} catch (InterruptedException ex) {
    executorService.shutdownNow();
    Thread.currentThread().interrupt();
}

总结

看到了吧,Java并发API非常简单易用,十分灵活,也很强大。真希望我多年前可以多花点功夫写一个这样简单的程序。这样我就可以在几小时内解决由传统单线程组件所引发的扩展性问题。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn