Home  >  Article  >  Java  >  How much does Java multi-threaded concurrent programming improve data processing efficiency?

How much does Java multi-threaded concurrent programming improve data processing efficiency?

WBOY
WBOYforward
2023-04-28 23:46:051028browse

    In the work scenario, we encountered such a requirement: to update related information of other models based on the IP address of the host. The requirements are very simple and only involve general database linkage query and update operations. However, during the coding implementation process, it was found that due to the large number of hosts, it takes a long time to loop through the query and update. It takes about 30-40 seconds to call the interface once. min time to complete the operation.

    Therefore, in order to effectively shorten the execution time of interface methods, consider using multi-threaded concurrent programming methods, taking advantage of the parallel execution capabilities of multi-core processors, and asynchronously processing data, which can greatly shorten the execution time and improve effectiveness.

    A reusable thread pool with a fixed number of threads is used here FixedThreadPool, and the concurrent process control tool provided by the CountDownLatch concurrent tool class is used in conjunction to ensure multi-thread concurrency Normal operation during programming:

    • First, obtain the CPU thread of the running machine through the Runtime.getRuntime().availableProcessors() method Number, used to subsequently set the number of threads in the fixed thread pool.

    • Secondly, , determine the characteristics of the task. If it is a computationally intensive task, set the number of threads to CPU thread number 1, if it is IO For intensive tasks, set the number of threads to 2 * Number of CPU threads . Since the method requires frequent interaction with the database, it is an IO-intensive task.

    • After that, the data is grouped and cut. Each thread processes one grouped data. The number of grouped groups is consistent with the number of threads, and a counter is also created. Object CountDownLatch, call the constructor, the initialization parameter value is the number of threads, ensuring that the main thread waits for all child threads to finish running before performing subsequent operations.

    • Then , call the executorService.execute() method, and rewrite the run method to write business logic and data processing Code, remember to decrement the counter by 1 after executing the current thread. Finally, when all sub-threads are completed, close the thread pool.

    After omitting the business logic code in the work scenario, the general processing method example is as follows:

    public ResponseData updateHostDept() {
    		// ...
    		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
            // split the hostMapList for the following multi-threads task
            // return the number of logical CPUs
            int processorsNum = Runtime.getRuntime().availableProcessors();
            // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
            // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
            int threadNum = processorsNum * 2;  
            // the number of each group data 
            int eachGroupNum = hostMapList.size() / threadNum; 
            List<List<Map>> groupList = new ArrayList<>();
            for (int i = 0; i < threadNum; i++) {
                int start = i * eachGroupNum;
                if (i == threadNum - 1) {
                    int end = mapList.size();
                    groupList.add(hostMapList.subList(start, end));
                } else {
                    int end = (i+1) * eachGroupNum;
                    groupList.add(hostMapList.subList(start, end));
                }
            }
            // update data by using multi-threads asynchronously
            ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
            CountDownLatch countDownLatch = new CountDownLatch(threadNum);
            for (List<Map> group : groupList) {
                executorService.execute(()->{
                    try {
                        for (Map map : group) {
                        	// update the data in mongodb
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                    	// let counter minus one 
                        countDownLatch.countDown();  
                    }
                });
            }
            try {
            	// main thread donnot execute until all child threads finish
                countDownLatch.await();  
            } catch (Exception e) {
                e.printStackTrace();
            }
            // remember to shutdown the threadPool
            executorService.shutdown();  
            return ResponseData.success();
    }

    Then after using the multi-threaded asynchronous update strategy, The approximate time required to call the interface has dropped from 30-40 min to 8-10 min, greatly improving execution efficiency.

    It should be noted that the newFixedThreadPool used here to create a thread pool has a flaw that its blocking queue defaults to an unbounded queue, and the default value is Integer.MAX_VALUE is very likely to cause OOM problems. Therefore, you can generally use ThreadPoolExecutor to create a thread pool, and you can specify the number of threads in the waiting queue to avoid OOM problems.

    public ResponseData updateHostDept() {
    		// ...
    		List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
            // split the hostMapList for the following multi-threads task
            // return the number of logical CPUs
            int processorsNum = Runtime.getRuntime().availableProcessors();
            // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
            // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
            int threadNum = processorsNum * 2;  
            // the number of each group data 
            int eachGroupNum = hostMapList.size() / threadNum; 
            List<List<Map>> groupList = new ArrayList<>();
            for (int i = 0; i < threadNum; i++) {
                int start = i * eachGroupNum;
                if (i == threadNum - 1) {
                    int end = mapList.size();
                    groupList.add(hostMapList.subList(start, end));
                } else {
                    int end = (i+1) * eachGroupNum;
                    groupList.add(hostMapList.subList(start, end));
                }
            }
            // update data by using multi-threads asynchronously
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<>(100));
            CountDownLatch countDownLatch = new CountDownLatch(threadNum);
            for (List<Map> group : groupList) {
                executor.execute(()->{
                    try {
                        for (Map map : group) {
                        	// update the data in mongodb
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                    	// let counter minus one 
                        countDownLatch.countDown();  
                    }
                });
            }
            try {
            	// main thread donnot execute until all child threads finish
                countDownLatch.await();  
            } catch (Exception e) {
                e.printStackTrace();
            }
            // remember to shutdown the threadPool
            executor.shutdown();  
            return ResponseData.success();
    }

    In the above code, the number of core threads and the maximum number of threads are 5 and 8 respectively. They are not set to very large values, because if they are set to a large value, frequent interruptions between threads will occur. Context switching will also increase time consumption, but will not maximize the advantages of multi-threading. As for how to choose appropriate parameters, it needs to be determined based on the parameters of the machine and the type of task.

    Finally, if you want to obtain the number of CPU threads of the machine through non-coding methods, it is also very simple. In the Windows system, you can view the number of CPU threads through the Task Manager and select "Performance". , as shown in the picture below:

    How much does Java multi-threaded concurrent programming improve data processing efficiency?

    As you can see from the picture above, the cores in my machine are eight CPUs, but one physical CPU core can be simulated through hyper-threading technology into two logical CPU threads, so my machine supports 8 cores and 16 threads.

    The above is the detailed content of How much does Java multi-threaded concurrent programming improve data processing efficiency?. For more information, please follow other related articles on the PHP Chinese website!

    Statement:
    This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete