工作場景中遇到這樣一個需求:根據主機的 IP 位址連動更新其他模型的相關資訊。需求很簡單,只涉及一般的資料庫連動查詢以及更新操作,然而在編碼實現過程中發現,由於主機的數量很多,導致循環遍歷查詢、更新時花費很長的時間,調用一次接口大概需要30-40 min 時間才能完成操作。
因此,為了有效縮短介面方法的執行時間,便考慮使用多執行緒並發程式設計方法,利用多核心處理器並行執行的能力,透過非同步處理資料的方式,便可以大大縮短執行時間,提高執行效率。
這裡使用可重複使用固定執行緒數的執行緒池FixedThreadPool
,並使用CountDownLatch
並發工具類別提供的並發流程控制工具作為配合使用,保證多執行緒並發程式設計過程中的正常運作:
首先,透過Runtime.getRuntime().availableProcessors()
方法取得執行機器的CPU 執行緒數,用於後續設定固定線程池的執行緒數量。
其次,判斷任務的特性,若為運算密集型任務設定執行緒數為CPU 執行緒數1
,如果為IO密集型任務則設定執行緒數為2 * CPU 執行緒數
,由於在方法中需要與資料庫進行頻繁的交互,因此屬於IO 密集型任務。
之後,對資料進行分組切割,每個執行緒處理一個分組的數據,分組的組數與執行緒數保持一致,並且還要建立計數器物件CountDownLatch
,呼叫建構函數,初始化參數值為執行緒數個數,保證主執行緒等待所有子執行緒執行結束後,再進行後續的操作。
然後,呼叫executorService.execute()
方法,重寫run
方法編寫業務邏輯與資料處理程式碼,執行完當前執行緒後記得將計數器減1操作。 最後,當所有子執行緒執行完成後,關閉執行緒池。
在省略工作場景中的業務邏輯程式碼後,通用的處理方法範例如下所示:
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executorService.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executorService.shutdown(); return ResponseData.success(); }
那麼在使用多執行緒非同步更新的策略後,從當初呼叫介面所需的大概時間為30-40 min
下降到了8-10 min
,大大提高了執行效率。
要注意的是,這裡使用的
newFixedThreadPool
建立執行緒池,它有一個缺陷就是,它的阻塞佇列預設是一個無界佇列,預設值為Integer.MAX_VALUE
極有可能會造成OOM 問題。因此,一般可以使用ThreadPoolExecutor
來建立執行緒池,自己可以指定等待佇列中的執行緒個數,避免產生 OOM 問題。
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executor.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executor.shutdown(); return ResponseData.success(); }
在上面的程式碼中,核心執行緒數和最大執行緒數分別為5 和8,並沒有設定的很大的值,因為如果如果設定的很大,執行緒間頻繁的上下文切換也會增加時間消耗,反而無法最大程度上發揮多執行緒的優勢。至於如何選擇合適的參數,則需要根據機器的參數以及任務的類型綜合考慮決定。
最後補充一點,如果想要透過非編碼的方式取得機器的CPU 執行緒個數也很簡單,windows 系統透過任務管理器,選擇“效能”,便可以查看CPU 執行緒個數的情況,如下圖所示:
從上圖可以看到,我的機器中核心是八個CPU,但是透過超線程技術一個物理的CPU 核心可以模擬成兩個邏輯CPU 線程,因此我的機器是支援8核心16線程的。
以上是Java多執行緒並發程式設計提高資料處理效率是多少的詳細內容。更多資訊請關注PHP中文網其他相關文章!