ホームページ >Java >&#&チュートリアル >Java マルチスレッド同時プログラミングはデータ処理効率をどの程度向上させますか?

Java マルチスレッド同時プログラミングはデータ処理効率をどの程度向上させますか?

WBOY
WBOY転載
2023-04-28 23:46:051088ブラウズ

    作業シナリオでは、ホストの IP アドレスに基づいて他のモデルの関連情報を更新するという要件に遭遇しました。要件は非常にシンプルで、一般的なデータベース連携のクエリと更新操作のみを必要としますが、コーディング実装の過程で、ホストの数が多いため、クエリと更新のループに長い時間がかかることが判明しました。インターフェイスを 1 回呼び出すのに約 30 ~ 40 秒かかります。操作が完了するまでの分時間。

    したがって、インターフェイス メソッドの実行時間を効果的に短縮するには、マルチスレッド同時プログラミング手法の使用、マルチコア プロセッサの並列実行機能の活用、データの非同期処理を検討してください。実行時間を短縮し、効率を向上させます。

    ここでは、固定数のスレッドを持つ再利用可能なスレッド プール FixedThreadPool が使用され、CountDownLatch 同時実行ツール クラスによって提供される同時プロセス制御ツールが併用されます。マルチスレッドの同時実行性を確保するため プログラミング中の通常の操作:

    • 最初に、Runtime.getRuntime() を通じて実行中のマシンの CPU スレッドを取得します。 availableProcessors() メソッド Number。その後、固定スレッド プール内のスレッド数を設定するために使用されます。

    • 2 番目に、 タスクの特性を決定します。計算負荷の高いタスクの場合は、スレッド数を CPU スレッド番号 1# に設定します。 ##、IO の場合、集中的なタスクの場合、スレッド数を 2 * CPU スレッドの数 に設定します。この方法ではデータベースと頻繁に対話する必要があるため、IO 集中型のタスクになります。

    • その後、データがグループ化されて切り取られます。各スレッドは 1 つのグループ化されたデータを処理します。グループ化されたグループの数はスレッドの数と一致し、オブジェクト CountDownLatch はコンストラクターを呼び出し、初期化パラメーター値はスレッドの数であり、メイン スレッドはすべての子スレッドの実行が完了するまで待機してから後続の操作を実行します。

    • 次に、

      executorService.execute() メソッドを呼び出し、run メソッドを書き換えてビジネス ロジックを記述します。データ処理コードでは、現在のスレッドの実行後にカウンターを 1 ずつデクリメントすることを忘れないでください。 最後に、すべてのサブスレッドが完了したら、スレッド プールを閉じます。

    • 作業シナリオでビジネス ロジック コードを省略した後の一般的な処理方法の例は次のとおりです。
    public ResponseData updateHostDept() {
    		// ...
    		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
            // split the hostMapList for the following multi-threads task
            // return the number of logical CPUs
            int processorsNum = Runtime.getRuntime().availableProcessors();
            // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
            // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
            int threadNum = processorsNum * 2;  
            // the number of each group data 
            int eachGroupNum = hostMapList.size() / threadNum; 
            List<List<Map>> groupList = new ArrayList<>();
            for (int i = 0; i < threadNum; i++) {
                int start = i * eachGroupNum;
                if (i == threadNum - 1) {
                    int end = mapList.size();
                    groupList.add(hostMapList.subList(start, end));
                } else {
                    int end = (i+1) * eachGroupNum;
                    groupList.add(hostMapList.subList(start, end));
                }
            }
            // update data by using multi-threads asynchronously
            ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
            CountDownLatch countDownLatch = new CountDownLatch(threadNum);
            for (List<Map> group : groupList) {
                executorService.execute(()->{
                    try {
                        for (Map map : group) {
                        	// update the data in mongodb
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                    	// let counter minus one 
                        countDownLatch.countDown();  
                    }
                });
            }
            try {
            	// main thread donnot execute until all child threads finish
                countDownLatch.await();  
            } catch (Exception e) {
                e.printStackTrace();
            }
            // remember to shutdown the threadPool
            executorService.shutdown();  
            return ResponseData.success();
    }

    次に、マルチスレッドの非同期更新戦略を使用した後、インターフェイスの呼び出しに必要なおおよその時間が

    30-40 min

    から 8-10 min に短縮され、実行効率が大幅に向上しました。

    ここでスレッド プールの作成に使用される
    newFixedThreadPool

    には、ブロッキング キューのデフォルトが無制限のキューになり、デフォルト値が であるという欠陥があることに注意してください。 Integer.MAX_VALUE は OOM 問題を引き起こす可能性が非常に高くなります。したがって、通常は ThreadPoolExecutor を使用してスレッド プールを作成し、待機キュー内のスレッドの数を指定して OOM 問題を回避できます。

    public ResponseData updateHostDept() {
    		// ...
    		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
            // split the hostMapList for the following multi-threads task
            // return the number of logical CPUs
            int processorsNum = Runtime.getRuntime().availableProcessors();
            // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
            // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
            int threadNum = processorsNum * 2;  
            // the number of each group data 
            int eachGroupNum = hostMapList.size() / threadNum; 
            List<List<Map>> groupList = new ArrayList<>();
            for (int i = 0; i < threadNum; i++) {
                int start = i * eachGroupNum;
                if (i == threadNum - 1) {
                    int end = mapList.size();
                    groupList.add(hostMapList.subList(start, end));
                } else {
                    int end = (i+1) * eachGroupNum;
                    groupList.add(hostMapList.subList(start, end));
                }
            }
            // update data by using multi-threads asynchronously
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<>(100));
            CountDownLatch countDownLatch = new CountDownLatch(threadNum);
            for (List<Map> group : groupList) {
                executor.execute(()->{
                    try {
                        for (Map map : group) {
                        	// update the data in mongodb
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                    	// let counter minus one 
                        countDownLatch.countDown();  
                    }
                });
            }
            try {
            	// main thread donnot execute until all child threads finish
                countDownLatch.await();  
            } catch (Exception e) {
                e.printStackTrace();
            }
            // remember to shutdown the threadPool
            executor.shutdown();  
            return ResponseData.success();
    }

    上記のコードでは、コア スレッドの数とスレッドの最大数はそれぞれ 5 と 8 ですが、大きな値に設定すると、あまり大きな値には設定されません。コンテキストの切り替えにより時間の消費も増加しますが、マルチスレッドの利点を最大限に活用することはできません。適切なパラメータをどのように選択するかについては、マシンのパラメータとタスクの種類に基づいて決定する必要があります。

    最後に、コーディング以外の方法でマシンの CPU スレッド数を取得する場合も非常に簡単です。Windows システムでは、タスク マネージャーを通じて CPU スレッド数を確認できます。

    Java マルチスレッド同時プログラミングはデータ処理効率をどの程度向上させますか? 上の図からわかるように、私のマシンのコアは 8 個の CPU ですが、物理コアは 1 個です。 CPU コアは、ハイパースレッディング テクノロジを通じて 2 つの論理 CPU スレッドにシミュレートできるため、私のマシンは 8 コアと 16 スレッドをサポートします。

    以上がJava マルチスレッド同時プログラミングはデータ処理効率をどの程度向上させますか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。