ホームページ >Java >&#&チュートリアル >ThreadPoolExecutor を使用して Java で独立したシングルスレッド タスクを並列実行する方法の詳細な紹介

ThreadPoolExecutor を使用して Java で独立したシングルスレッド タスクを並列実行する方法の詳細な紹介

黄舟
黄舟オリジナル
2017-03-23 11:06:443012ブラウズ

タスク実行フレームワークは Java SE 5.0 で導入されました。これは、マルチスレッド プログラミング開発の簡素化において大きな進歩です。このフレームワークを使用すると、タスクを簡単に管理できます。タスクの ライフ サイクル と実行戦略を管理します。

この記事では、簡単な例を使用して、このフレームワークによってもたらされる柔軟性とシンプルさを示します。

基本

実行フレームワークには、タスクの実行を管理するためのExecutorインターフェイスが導入されています。 Executor は、実行可能なタスクを送信するために使用されるインターフェイスです。このインターフェイスは、タスクの送信をタスクの実行から分離します。異なる実行戦略を持つエグゼキュータはすべて、同じ送信インターフェイスを実装します。実行戦略を変更しても、タスク送信ロジックには影響しません。

Runnable オブジェクトを実行のために送信する場合、それは非常に簡単です:

Executor exec = …;
exec.execute(runnable);

スレッド プール

前に述べたように、実行プログラムが送信された実行可能タスクをどのように実行するかは Executor インターフェイスでは指定されておらず、実行プログラムに依存します。特定のタイプを使用します。このフレームワークはいくつかの異なるエグゼキュータを提供し、実行戦略はシナリオごとに異なります。

使用できる最も一般的なエグゼキュータのタイプは、ThreadPoolExecutor クラス (およびそのサブクラス) のインスタンスであるスレッド プール エグゼキュータです。 ThreadPoolExecutor は、スレッド プールとワーク キューを管理します。スレッド プールには、タスクの実行に使用されるワーカー スレッドが格納されます。

他のテクノロジーにおける「プール」の概念は理解できたはずです。 「プール」を使用する最大の利点の 1 つは、リソースの作成コストを削減し、使用して解放した後、再利用できることです。もう 1 つの間接的な利点は、使用されるリソースの量を制御できることです。たとえば、システム リソースを損なうことなく、スレッド プールのサイズを調整して、必要な負荷を達成できます。

このフレームワークは、スレッド プールを作成するための Executor と呼ばれるファクトリ クラスを提供します。このエンジニアリング クラスを使用すると、さまざまな特性を持つスレッド プールを作成できます。多くの場合、基礎となる実装は同じ (ThreadPoolExecutor) ですが、ファクトリ クラスを使用すると、複雑な

コンストラクター を使用せずにスレッド プールを迅速にセットアップできます。エンジニアリング クラスのファクトリ メソッドは次のとおりです。

  • newFixedThreadPool: このメソッドは、最大容量が固定されたスレッド プールを返します。新しいスレッドはオンデマンドで作成され、スレッドの数は構成された数を超えません。スレッド数が最大値に達しても、スレッド プールは変更されません。

  • newCachedThreadPool: このメソッドは無制限のスレッド プールを返します。つまり、最大数の制限はありません。ただし、ワークロードが減少すると、このタイプのスレッド プールは未使用のスレッドを破棄します。

  • newSingleThreadedExecutor: このメソッドは、すべてのタスクが単一のスレッドで実行されることを保証できるエグゼキューターを返します。

  • newScheduledThreadPool: このメソッドは、遅延タスクおよびスケジュールされたタスクの実行をサポートする固定サイズのスレッド プールを返します。

これはほんの始まりにすぎません。この記事の範囲を超えた他にも Executor の使用方法がいくつかあります。以下について学習することを強くお勧めします:

  • ExecutorService インターフェースによって宣言されるライフサイクル管理メソッド (shutdown() や awaitTermination() など) )。

  • CompletionServiceを使用してタスクのステータスをクエリし、戻り値がある場合は戻り値を取得します。

ExecutorService インターフェイスは、スレッド プールをシャットダウンし、使用されなくなったリソースが確実にクリーンアップされるようにするメソッドを提供するため、特に重要です。幸いなことに、ExecutorService インターフェイスは非常にシンプルで一目瞭然なので、そのドキュメントを包括的に学習することをお勧めします。

大まかに言えば、shutdown() メッセージを ExecutorService に送信すると、新しく送信されたタスクは受信されませんが、キュー内に残っているタスクは引き続き処理されます。 isTerminated() を使用して ExecutorService の終了ステータスをクエリしたり、awaitTermination(...) メソッドを使用して ExecutorService が終了するのを待機したりできます。最大タイムアウトをパラメータとして渡すと、awaitTermination メソッドは永久に待機しません。

警告: JVM プロセスは決して終了しないという理解には、いくつかの誤りと混乱があります。 executorService を閉じずに基礎となるスレッドを破棄するだけの場合、JVM は終了しません。最後の通常のスレッド (非デーモン スレッド) が終了すると、JVM も終了します。

ThreadPoolExecutor の構成

Executor ファクトリ クラスを使用せず、手動で ThreadPoolExecutor を作成する場合は、コンストラクターを使用して作成および構成する必要があります。以下は、このクラスで最も広く使用されているコンストラクターの 1 つです:

public ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAlive,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler);

ご覧のとおり、以下を設定できます:

  • コア プールのサイズ (スレッド プールが使用するサイズ)

  • 最大プールサイズ

  • 生存時間、アイドルスレッドはこの時間を過ぎると破棄されます

  • タスクを保存するワークキュー

  • タスクの送信が拒否された後に実行される戦略

限制队列中任务数

限制执行任务的并发数、限制线程池大小对应用程序以及程序执行结果的可预期性与稳定性有很大的好处。无尽地创建线程,最终会耗尽运行时资源。你的应用程序因此会产生严重的性能问题,甚至导致程序不稳定。

这只解决了部分问题:限制了并发任务数,但并没有限制提交到等待队列的任务数。如果任务提交的速率一直高于任务执行的速率,那么应用程序最终会出现资源短缺的状况。

解决方法是:

  • 为Executor提供一个存放待执行任务的阻塞队列。如果队列填满,以后提交的任务会被“拒绝”。

  • 当任务提交被拒绝时会触发RejectedExecutionHandler,这也是为什么这个类名中引用动词“rejected”。你可以实现自己的拒绝策略,或者使用框架内置的策略。

默认的拒绝策略可以让executor抛出一个RejectedExecutionException异常。然而,还有其他的内建策略:

  • 悄悄地丢弃一个任务

  • 丢弃最旧的任务,重新提交最新的

  • 在调用者的线程中执行被拒绝的任务

什么时候以及为什么我们才会这样配置线程池?让我们看一个例子。

示例:并行执行独立的单线程任务

最近,我被叫去解决一个很久以前的任务的问题,我的客户之前就运行过这个任务。大致来说,这个任务包含一个组件,这个组件监听目录树所产生的文件系统事件。每当一个事件被触发,必须处理一个文件。一个专门的单线程执行文件处理。说真的,根据任务的特点,即使我能把它并行化,我也不想那么做。一天的某些时候,事件到达率才很高,文件也没必要实时处理,在第二天之前处理完即可。

当前的实现采用了一些混合且匹配的技术,包括使用UNIX SHELL脚本扫描目录结构,并检测是否发生改变。实现完成后,我们采用了双核的执行环境。同样,事件的到达率相当低:目前为止,事件数以百万计,总共要处理1~2T字节的原始数据。

运行处理程序的主机是12核的机器:很好机会去并行化这些旧的单线程任务。基本上,我们有了食谱的所有原料,我们需要做的仅仅是把程序建立起来并调节。在写代码前,我们必须了解下程序的负载。我列一下我检测到的内容:

  • 有非常多的文件需要被周期性地扫描:每个目录包含1~2百万个文件

  • 扫描算法很快,可以并行化

  • 处理一个文件至少需要1s,甚至上升到2s或3s

  • 处理文件时,性能瓶颈主要是CPU

  • CPU利用率必须可调,根据一天时间的不同而使用不同的负载配置。

我需要这样一个线程池,它的大小在程序运行的时候通过负载配置来设置。我倾向于根据负载策略创建一个固定大小的线程池。由于线程的性能瓶颈在CPU,它的核心使用率是100%,不会等待其他资源,那么负载策略就很好计算了:用执行环境的CPU核心数乘以一个负载因子(保证计算的结果在峰值时至少有一个核心):

int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后我需要使用阻塞队列创建一个ThreadPoolExecutor,可以限制提交的任务数。为什么?是这样,扫描算法执行很快,很快就产生庞大数量需要处理的文件。数量有多庞大呢?很难预测,因为变动太大了。我不想让executor内部的队列不加选择地填满了要执行的任务实例(这些实例包含了庞大的文件描述符)。我宁愿在队列填满时,拒绝这些文件。

而且,我将使用ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略。为什么?因为当队列已满时,线程池的线程忙于处理文件,我让提交任务的线程去执行它(被拒绝的任务)。这样,扫面会停止,转而去处理一个文件,处理结束后马上又会扫描目录。

下面是创建executor的代码:

ExecutorService executorService =
    new ThreadPoolExecutor(
        maxThreads, // core thread pool size
        maxThreads, // maximum thread pool size
        1, // time to wait before resizing pool
        TimeUnit.MINUTES, 
        new ArrayBlockingQueue<Runnable>(maxThreads, true),
        new ThreadPoolExecutor.CallerRunsPolicy());

 下面是程序的框架(极其简化版):

// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {
    File currentDir = dirsToProcess.pop();

    // listing children
    File[] children = currentDir.listFiles();

    // processing children
    for (final File currentFile : children) {
        // if it&#39;s a directory, defer processing
        if (currentFile.isDirectory()) {
            dirsToProcess.add(currentFile);
            continue;
        }

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // if it&#39;s a file, process it
                    new ConvertTask(currentFile).perform();
                } catch (Exception ex) {
                    // error management logic
                }
            }
        });
    }
}

// ...
// wait for all of the executor threads to finish
executorService.shutdown();
try {
    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the first try
        executorService.shutdownNow();
    }

    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the second try
    }
} catch (InterruptedException ex) {
    executorService.shutdownNow();
    Thread.currentThread().interrupt();
}

总结

看到了吧,Java并发API非常简单易用,十分灵活,也很强大。真希望我多年前可以多花点功夫写一个这样简单的程序。这样我就可以在几小时内解决由传统单线程组件所引发的扩展性问题。

以上がThreadPoolExecutor を使用して Java で独立したシングルスレッド タスクを並列実行する方法の詳細な紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。