工作場景中遇到這樣一個需求:根據主機的 IP 位址連動更新其他模型的相關資訊。需求很簡單,只涉及一般的資料庫連動查詢以及更新操作,然而在編碼實現過程中發現,由於主機的數量很多,導致循環遍歷查詢、更新時花費很長的時間,調用一次接口大概需要30-40 min 時間才能完成操作。
方法取得執行機器的CPU 執行緒數,用於後續設定固定線程池的執行緒數量。
其次,判斷任務的特性,若為運算密集型任務設定執行緒數為CPU 執行緒數1
,如果為IO密集型任務則設定執行緒數為2 * CPU 執行緒數
,由於在方法中需要與資料庫進行頻繁的交互,因此屬於IO 密集型任務。
方法編寫業務邏輯與資料處理程式碼,執行完當前執行緒後記得將計數器減1操作。 最後,當所有子執行緒執行完成後,關閉執行緒池。
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executorService.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executorService.shutdown(); return ResponseData.success(); }
那麼在使用多執行緒非同步更新的策略後,從當初呼叫介面所需的大概時間為30-40 min
下降到了8-10 min
極有可能會造成OOM 問題。因此,一般可以使用ThreadPoolExecutor
來建立執行緒池,自己可以指定等待佇列中的執行緒個數,避免產生 OOM 問題。
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executor.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executor.shutdown(); return ResponseData.success(); }
在上面的程式碼中,核心執行緒數和最大執行緒數分別為5 和8,並沒有設定的很大的值,因為如果如果設定的很大,執行緒間頻繁的上下文切換也會增加時間消耗,反而無法最大程度上發揮多執行緒的優勢。至於如何選擇合適的參數,則需要根據機器的參數以及任務的類型綜合考慮決定。
最後補充一點,如果想要透過非編碼的方式取得機器的CPU 執行緒個數也很簡單,windows 系統透過任務管理器,選擇“效能”,便可以查看CPU 執行緒個數的情況,如下圖所示:
從上圖可以看到,我的機器中核心是八個CPU,但是透過超線程技術一個物理的CPU 核心可以模擬成兩個邏輯CPU 線程,因此我的機器是支援8核心16線程的。