ホームページ >Java >&#&チュートリアル >JavaスレッドプールExecutorの使用方法

JavaスレッドプールExecutorの使用方法

王林
王林転載
2023-04-28 10:01:061935ブラウズ

    スレッド プール クラス図

    JavaスレッドプールExecutorの使用方法

    スレッド プールを作成してスレッドを使用するために最も一般的に使用される Executor 実装は、主に次のとおりです。上記のクラス図で提供されるクラスを使用します。上のクラス図には、実行をスケジュールし、一連の実行戦略呼び出しに基づいて非同期タスクを制御するフレームワークである Executor フレームワークが含まれています。その目的は、タスクの送信とタスクの実行方法を分離するメカニズムを提供することです。これには 3 つのエグゼキュータ インターフェイスが含まれています:

    • Executor: 新しいタスクを実行するためのシンプルなインターフェイス

    • ExecutorService: Executor を拡張し、エグゼキュータの寿命を管理するメソッドを追加します。サイクルとタスクのライフ サイクル

    • ScheduleExcutorService: ExecutorService を拡張して、今後のタスクの定期的な実行をサポートします

    スレッド プールの利点

    • リソース消費の削減 - 既存のスレッドを再利用し、オブジェクトの作成と破棄のコストを削減し、優れたパフォーマンスを実現します

    • 応答を向上させますSpeed - 同時スレッドの最大数を効果的に制御し、システム リソースの使用率を向上させ、過剰なリソースの競合やブロックを回避できます。タスクが到着すると、スレッドの作成を待たずにすぐにタスクを実行できます。

    • スレッド管理性の向上-スケジュール実行、定期実行、単一実行を提供します。 -スレッド、同時実行制御およびその他の機能。

    新しいスレッドの欠点

    • 新しいスレッドが新しいオブジェクトを作成するたびに、パフォーマンスが低下します

    • スレッドの統合管理が行われていないと、新しいスレッドが無制限に作成され、互いに競合し、システム リソースを占有しすぎて、クラッシュや OOM (メモリ不足メモリ オーバーフロー) が発生する可能性があります。この問題の原因は、単なる問題ではありません。新しいスレッドが発生しますが、プログラムのバグや設計上の欠陥が原因である可能性があり、常に新しいスレッドが発生します。

    • 追加の実行、定期的な実行、スレッドの中断などのその他の機能がありません。

    スレッド プール コア クラス - ThreadPoolExecutor

    パラメータの説明: ThreadPoolExecutor には合計 7 つのパラメータがあり、これら 7 つのパラメータが連携してスレッド プールの強力な機能を形成します。

    corePoolSize: コア スレッドの数

    maximumPoolSize: 最大スレッド数

    workQueue:実行を待っているタスクを保存するブロッキング キューは非常に重要であり、スレッド プールの実行プロセスに大きな影響を与えます。

    新しいタスクをスレッド プールに送信すると、スレッド プールはプール内で現在実行されているスレッドの数に基づいて、タスクがどのように処理されるかを決定します。処理方法は 3 つあります:

    1. 直接切り替え (SynchronusQueue)

    2. アンバウンド キュー (LinkedBlockingQueue) で作成できるスレッドの最大数は corePoolSize です。 MaximumPoolSize は機能しません。スレッド プール内のすべてのコア スレッドが実行されている場合、新しいタスクの送信は待機キューに入れられます。

    3. 制限付きキュー (ArrayBlockingQueue) の最大 MaximumPoolSize を使用するとリソースの消費を削減できますが、この方法ではスレッド プールのスケジューリングがより困難になります。スレッド プールとキューの容量が限られているためです。したがって、スレッド プールと処理タスクのスループット レートを適切な範囲に達させたい場合、またスレッドのスケジューリングを比較的シンプルにしてリソース消費を可能な限り削減したい場合は、これら 2 つの量割り当て手法を合理的に制限する必要があります。 : [ CPU 使用率、オペレーティング システムのリソース消費、コンテキスト切り替えオーバーヘッドなどのリソース消費を削減したい場合は、より大きなキュー容量とより小さなスレッド プール容量を設定できます。これにより、スレッド プールのスループットが低下します。 。送信したタスクが頻繁にブロックされる場合は、maximumPoolSize を調整できます。キューの容量が小さい場合は、CPU 使用率が相対的に高くなるように、スレッド プールのサイズを大きく設定する必要があります。ただし、スレッドプールの容量を大きくしすぎてタスク数を増やしすぎると同時実行量が増加するため、スレッド間のスケジューリングを考慮する必要があります。これにより、タスク処理のスループットが低下する可能性があります。 ]

    keepAliveTime: タスクの実行がない場合 (スレッド内のスレッド数が corePoolSize より大きい場合、コアプール サイズがある場合) にスレッドが終了するまで保持される最大時間。現時点では、コア スレッド外のスレッドに新しいタスクが送信されていません。すぐには破棄されませんが、keepAliveTime を超えるまで待ちます)

    unit: keepAliveTime の時間単位

    threadFactory: スレッド ファクトリ、スレッドの作成に使用されます。スレッドを作成するためのデフォルト ファクトリがあるため、新しく作成されたスレッドは同じ優先順位を持ち、非デーモン スレッドであり、名前が設定されています)

    rejectHandler: タスク (ブロッキング キューがいっぱい) の拒否処理戦略 (AbortPolicy デフォルト ポリシーは例外を直接スローし、CallerRunsPolicy は呼び出し元のスレッドを使用してタスクを実行し、DiscardOldestPolicy はタスクを破棄します)キューの先頭のタスクを実行して現在のタスクを実行すると、DiscardPolicy は現在のタスクを直接破棄します)

    JavaスレッドプールExecutorの使用方法

    corePoolSize、maximumPoolSize、workQueue の関係: 実行中のスレッドの数が corePoolSize より少ない場合、タスクを処理するために新しいスレッドが直接作成されます。スレッド プール内の他のスレッドがアイドル状態であっても。実行中のスレッドの数が corePoolSize より大きく、maximumPoolSize より小さい場合、workQueue がいっぱいの場合にのみタスクを処理するための新しいスレッドが作成されます。 corePoolSizeとmaximumPoolSizeが同じ場合、作成されるスレッドプールのサイズは固定されます。このとき、新しいタスクがサブミットされ、workQueue が満杯でない場合、リクエストは workQueue に配置されます。空のスレッドが workQueue からタスクを削除するまで待ちます。この時点で workQueue もいっぱいである場合は、追加の拒否ポリシー パラメーターを使用して拒否ポリシーを実行します。

    初期化メソッド: 7 つのパラメータから 4 つの初期化メソッドで構成されます

    JavaスレッドプールExecutorの使用方法

    その他のメソッド:

    execute();	//提交任务,交给线程池执行	
    submit();//提交任务,能够返回执行结果 execute+Future
    shutdown();//关闭线程池,等待任务都执行完
    shutdownNow();//关闭线程池,不等待任务执行完
    getTaskCount();//线程池已执行和未执行的任务总数
    getCompleteTaskCount();//已完成的任务数量
    getPoolSize();//线程池当前的线程数量
    getActiveCount();//当前线程池中正在执行任务的线程数量

    スレッド プールのライフ サイクル:

    JavaスレッドプールExecutorの使用方法

    • running: 新しく送信されたタスクを受け入れることができ、ブロッキング キュー内のタスクも処理できます

    • shutdown: 処理できません新しいタスクですが、ブロックされたキュー内のタスクの処理を続行できます

    • #stop: 新しいタスクを受信できず、キュー内のタスクを処理できません

    • tidying: すべてのタスクが終了した場合、有効なスレッドの数は 0

    • terminated: Final state

    を使用してスレッド プールを作成します。 Executors

    Executors を使用して 4 つのスレッド プールを作成できます。上記の 4 つのスレッド プール初期化メソッドに対応します。

    Executors.newCachedThreadPool

    newCachedThreadPool は、新規のスレッド プールを作成します。必要に応じてスレッドを実行する タスクがサブミットされると、corePoolSize は 0 になり、コア スレッドは作成されません。SynchronousQueue は要素を格納しないキューです。キューは常に満杯であるため、最終的には非コア スレッドが作成されることがわかります. タスクを実行します。非コアスレッドは、60 秒間アイドル状態になるとリサイクルされます。 Integer.MAX_VALUE は非常に大きいため、スレッドを無限に作成できると考えられ、リソースが限られている場合に OOM 例外が発生しやすくなります。

    //创建newCachedThreadPool线程池源码
    public static ExecutorService newCachedThreadPool() {
    		/**
            *corePoolSize: 0,核心线程池的数量为0
    		*maximumPoolSize:  Integer.MAX_VALUE,可以认为最大线程数是无限的
    		*keepAliveTime: 60L
    		*unit: 秒
    		*workQueue: SynchronousQueue
            **/
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    ユースケース:

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
    }

    newCachedThreadPool の戻り値は ExecutorService タイプであり、これには基本的なスレッド プール メソッドのみが含まれ、スレッド監視関連のメソッドは含まれないことに注意してください。 、ExecutorService の戻り値を持つスレッド プール タイプを使用して新しいスレッドを作成するときは、特定の状況を考慮する必要があります。

    JavaスレッドプールExecutorの使用方法

    Executors.newSingleThreadExecutor

    newSingleThreadExecutor は、コア スレッドが 1 つだけあるシングルスレッド スレッド プールであり、タスクの実行に 1 つの共有スレッドのみを使用して、すべてのスレッドが確実に実行されるようにします。タスクは指定どおりに実行されます シーケンシャル実行 (FIFO、優先順位...)

    //newSingleThreadExecutor创建线程池源码
    public static ExecutorService newSingleThreadExecutor() {
        /**
          *  corePoolSize : 1,核心线程池的数量为1
    
          *  maximumPoolSize : 1,只可以创建一个非核心线程
    
          *  keepAliveTime : 0L
    
          *  unit => 秒
    
          *  workQueue => LinkedBlockingQueue
          **/
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    タスクがサブミットされると、タスクを実行するために最初にコア スレッドが作成されます。コア スレッドの数を超えると、コア スレッドが作成されます。 LinkedBlockingQueue は Integer の長さのキューであるため、キューに入れられます。MAX_VALUE は無制限のキューと見なされるため、無限の数のタスクがキューに挿入される可能性があり、リソースが制限されている場合に OOM 例外が発生しやすくなります。同時に、無制限のキューのため、maximumPoolSize パラメーターと keepAliveTime パラメーターは無効になり、まったく機能しなくなり、非コア スレッドが作成されます。

    Executors.newFixedThreadPool

    固定長スレッド プール、コア スレッドの数、および最大スレッド数はユーザーによって渡されます。最大同時スレッド数を設定できます。それを超えたらキューで待つ

    //newFixedThreadPool创建线程池源码
    public static ExecutorService newFixedThreadPool(int nThreads) {
        	/**
              *  corePoolSize : 核心线程的数量为自定义输入nThreads
    
              *  maximumPoolSize : 最大线程的数量为自定义输入nThreads
    
              *  keepAliveTime : 0L
    
              *  unit : 秒
    
              *  workQueue : LinkedBlockingQueue
              **/
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    newFixedThreadPoolはSingleThreadExecutorと似ていますが、唯一の違いはコアスレッド数が異なることと、LinkedBlockingQueueを使用しているため、リソースが限られている場合にOOM例外が発生しやすいことです。

    Executors.newScheduledThreadPool

    固定長スレッド プール。コア スレッドの数はユーザーによって渡され、スケジュールされた定期的なタスクの実行をサポートします

    //newScheduledThreadPool创建线程池源码
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        /**
          *  corePoolSize : 核心线程的数量为自定义输入corePoolSize
    
          *  maximumPoolSize : 最大线程的数量为Integer.MAX_VALUE
    
          *  keepAliveTime : 0L
    
          *  unit : 纳秒
    
          *  workQueue : DelayedWorkQueue
          **/
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    タスクが実行されるときが送信され、corePoolSize が自動的に入力を定義します。コア スレッドがいっぱいになった後、最初にコア スレッドが作成されるため、最終的にはタスクを実行するために非コア スレッドが作成されます。芯以外の糸は使用後リサイクルされます。 Integer.MAX_VALUE は非常に大きいため、スレッドを無限に作成できると考えられ、リソースが限られている場合に OOM 例外が発生しやすくなります。使用される DelayedWorkQueue は、スケジュールされた定期的なタスクを実装できるためです。 ScheduledExecutorService には、使用できる 3 つのメソッドが用意されています。

    JavaスレッドプールExecutorの使用方法

    schedule: 遅延後にタスクを実行します。scheduleAtFixedRate: 指定された速度でタスクを実行します。scheduleWithFixedDelay: 指定された遅延でタスクを実行します。使用例:

        public static void main(String[] args) {
    
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    
    //        executorService.schedule(new Runnable() {
    //            @Override
    //            public void run() {
    //                log.warn("schedule run");
    //            }
    //         //延迟3秒后执行
    //        }, 3, TimeUnit.SECONDS);
            //        executorService.shutdown();
    
    //        executorService.scheduleWithFixedDelay(new Runnable() {
    //            @Override
    //            public void run() {
    //                log.warn("scheduleWithFixedDelay run");
    //            }
    //            //延迟一秒后每隔3秒执行
    //        }, 1, 3, TimeUnit.SECONDS);
            
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    log.warn("schedule run");
                }
                //延迟一秒后每隔3秒执行
            }, 1, 3, TimeUnit.SECONDS);
    
            /**
             * 定时器调度,不推荐使用,推荐ScheduledExecutorService调度
             */
    //        Timer timer = new Timer();
    //        timer.schedule(new TimerTask() {
    //            @Override
    //            public void run() {
    //                log.warn("timer run");
    //            }
    //        //从当前时间每隔5秒执行
    //        }, new Date(), 5 * 1000);
        }

    summary

    • FixedThreadPool および SingleThreadExecutor の許可されるリクエスト キューの長さは Integer.MAX_VALUE であり、大量のリクエストが蓄積され、OOM 例外が発生する可能性があります

    • CachedThreadPool および newScheduledThreadPool によって作成できるスレッドの数は Integer.MAX_VALUE です。これにより、大量のスレッドが作成され、OOM 例外が発生する可能性があります

    これが、その使用が禁止されている理由です。Executor がスレッド プールを作成するが、ThreadPoolExecutor を自分で作成することを推奨する理由です。

    スレッド プール パラメータの定義方法

    CPU 負荷が高い:スレッド プールのサイズは、CPU 数 1 にすることをお勧めします。CPU 数は、Runtime.availableProcessors メソッドに従って取得できます。 IO-集約型: CPU 数 * CPU 使用率 * (待機中の 1 スレッド)時間/スレッド CPU 時間) 混合タイプ: タスクを CPU 集中型と IO 集中型に分割し、処理に異なるスレッド プールを使用するため、各スレッド プールは独自のワークロードに応じて調整できます。 ブロッキング キュー: 境界付きキューを使用することをお勧めします。キューはリソースの枯渇を避けるのに役立ちます。 拒否ポリシー: デフォルトのポリシーは、AbortPolicy 拒否ポリシーであり、RejectedExecutionException を直接スローします。プログラム [これは実行時例外であり、キャッチを強制しないため] この処理は十分にエレガントではありません。拒否を処理するには、次の戦略をお勧めします。

    • プログラム内で RejectedExecutionException 例外をキャッチし、キャッチされた例外内のタスクを処理します。デフォルトの拒否ポリシーには、CallerRunsPolicy 拒否ポリシーを使用します。

    • このポリシーは、実行のために実行を呼び出すスレッド (通常はメイン スレッド) にタスクを渡します。メインスレッドは一定期間実行できなくなります。タスクを送信すると、ワーカースレッドが実行中のタスクを処理します。この時点で送信されたスレッドは、TCP キューに保存されます。TCP キューがいっぱいの場合、クライアントに影響します。これは、軽いパフォーマンスの低下です。

    • カスタム拒否ポリシーのみRejectedExecutionHandler インターフェイスを使用できます

    • #タスクが特に重要でない場合は、DiscardPolicy および DiscardOldestPolicy 拒否戦略を使用してタスクを破棄することもできます

    エグゼキュータを使用する場合静的メソッドは ThreadPoolExecutor オブジェクトを作成します。セマフォを使用してタスクの実行を制限し、OOM 例外を回避できます

    以上がJavaスレッドプールExecutorの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。