Home >Java >javaTutorial >How to implement multi-threaded program using Java?
1. Multi-thread execution task class
package com.visy.threadpool; import com.visy.executor.ExecutorFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Configuration; @Configuration public class ThreadPoolConfig { private TheadPoolProperties theadPoolProperties; private ThreadPoolExecutor executor; private ThreadPoolExecutor executorChild; public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); } public <V> List<V> doConcurrentTask(List<Callable<V>> taskList, ThreadPoolExecutor... executorChilds) { if (taskList != null && !taskList.isEmpty()) { List<V> resultList = new ArrayList(); List futureList = null; try { if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) { throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size()); } if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) { futureList = executorChilds[0].invokeAll(taskList); } else { futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS); } } catch (InterruptedException var6) { var6.printStackTrace(); } this.doFutureList(resultList, futureList); return resultList; } else { return null; } } <V> void doFutureList(List<V> resultList, List<Future<V>> futureList) { if (futureList != null) { Iterator var3 = futureList.iterator(); while(var3.hasNext()) { Future future = (Future)var3.next(); try { resultList.add(future.get()); } catch (ExecutionException | InterruptedException var6) { var6.printStackTrace(); } } } } public <V> void doVoidConcurrentTask(List<Callable<V>> taskList) { if (taskList != null && !taskList.isEmpty()) { Iterator var2 = taskList.iterator(); while(var2.hasNext()) { Callable<V> call = (Callable)var2.next(); this.executor.submit(call); } } } public TheadPoolProperties getTheadPoolProperties() { return this.theadPoolProperties; } public ThreadPoolExecutor getExecutor() { return this.executor; } public ThreadPoolExecutor getExecutorChild() { return this.executorChild; } public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public void setExecutorChild(ThreadPoolExecutor executorChild) { this.executorChild = executorChild; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof ThreadPoolConfig)) { return false; } else { ThreadPoolConfig other = (ThreadPoolConfig)o; if (!other.canEqual(this)) { return false; } else { label47: { Object this$theadPoolProperties = this.getTheadPoolProperties(); Object other$theadPoolProperties = other.getTheadPoolProperties(); if (this$theadPoolProperties == null) { if (other$theadPoolProperties == null) { break label47; } } else if (this$theadPoolProperties.equals(other$theadPoolProperties)) { break label47; } return false; } Object this$executor = this.getExecutor(); Object other$executor = other.getExecutor(); if (this$executor == null) { if (other$executor != null) { return false; } } else if (!this$executor.equals(other$executor)) { return false; } Object this$executorChild = this.getExecutorChild(); Object other$executorChild = other.getExecutorChild(); if (this$executorChild == null) { if (other$executorChild != null) { return false; } } else if (!this$executorChild.equals(other$executorChild)) { return false; } return true; } } } protected boolean canEqual(Object other) { return other instanceof ThreadPoolConfig; } public int hashCode() { int PRIME = true; int result = 1; Object $theadPoolProperties = this.getTheadPoolProperties(); int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode()); Object $executor = this.getExecutor(); result = result * 59 + ($executor == null ? 43 : $executor.hashCode()); Object $executorChild = this.getExecutorChild(); result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode()); return result; } public String toString() { return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")"; } }
2. Executor factory class
package com.visy.executor; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ExecutorFactory { private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class); private static final Map<String, ThreadPoolExecutor> threadPoolExecutorMap = new ConcurrentHashMap(); private static final int DEFAULT_QUEUE_SIZE = 1000; private static final String DEFAULT_EXECUTOR_NAME = "default-executor"; private static final int MAX_THREAD_NUM = 100; private static final int CORE_THREAD_NUM = 1; private static volatile ExecutorFactory instance; private ExecutorFactory() { } public static ExecutorFactory getInstance() { if (instance == null) { Class var0 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (instance == null) { instance = new ExecutorFactory(); } } } return instance; } public ThreadPoolExecutor getThreadPoolExecutorByName(String name) { return (ThreadPoolExecutor)threadPoolExecutorMap.get(name); } public static Map<String, ThreadPoolExecutor> getThreadPoolExecutorMap() { return threadPoolExecutorMap; } public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) { if (StringUtils.isBlank(threadPoolExecutorName)) { throw new IllegalArgumentException("thread name empty"); } else { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { Class var5 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor(); threadPoolExecutorMap.put(threadPoolExecutorName, executor); logger.info("thread name: {} executor created", threadPoolExecutorName); } } } return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName); } } public <T extends Runnable> void submit(T t) { ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); defaultExecutor.submit(t); } public <T extends Runnable> void submit(String poolName, T t) { ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); if (executor == null) { logger.error("thread name: {} executor not exist.", poolName); throw new IllegalArgumentException("thread name:" + poolName + " executor not exist."); } else { executor.submit(t); } } public <T extends Callable<Object>> Future<Object> submit(T t) { ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); return defaultExecutor.submit(t); } public <T extends Callable<Object>> Future<Object> submit(String poolName, T t) { ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); if (executor == null) { logger.error("thread poolName: {} executor not exist.", poolName); throw new IllegalArgumentException("thread poolName:" + poolName + " executor not exist."); } else { return executor.submit(t); } } public ThreadPoolExecutor getThreadPoolExecutor() { return this.getThreadPoolExecutor("default-executor", 1000, 1, 100); } }
3. Multi-thread configuration class
package com.visy.threadpool; import javax.validation.constraints.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.validation.annotation.Validated; @Validated @Configuration @ConfigurationProperties(prefix = "visy.threadpool") public class TheadPoolProperties { // 执行并行任务时,等待多久时间超时(单位:秒) @NotNull private Integer timeOut; // 队列大小 @NotNull private Integer queueSize; // 核心线程数量 @NotNull private Integer coreThreadNum; // 线程池最大线程数量 @NotNull private Integer maxPoolSize; // 并行执行每组大小 private Integer groupSize = 20; public TheadPoolProperties() { } public Integer getTimeOut() { return this.timeOut; } public Integer getQueueSize() { return this.queueSize; } public Integer getCoreThreadNum() { return this.coreThreadNum; } public Integer getMaxPoolSize() { return this.maxPoolSize; } public Integer getGroupSize() { return this.groupSize; } public void setTimeOut(Integer timeOut) { this.timeOut = timeOut; } public void setQueueSize(Integer queueSize) { this.queueSize = queueSize; } public void setCoreThreadNum(Integer coreThreadNum) { this.coreThreadNum = coreThreadNum; } public void setMaxPoolSize(Integer maxPoolSize) { this.maxPoolSize = maxPoolSize; } public void setGroupSize(Integer groupSize) { this.groupSize = groupSize; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof TheadPoolProperties)) { return false; } else { TheadPoolProperties other = (TheadPoolProperties)o; if (!other.canEqual(this)) { return false; } else { label71: { Object this$timeOut = this.getTimeOut(); Object other$timeOut = other.getTimeOut(); if (this$timeOut == null) { if (other$timeOut == null) { break label71; } } else if (this$timeOut.equals(other$timeOut)) { break label71; } return false; } Object this$queueSize = this.getQueueSize(); Object other$queueSize = other.getQueueSize(); if (this$queueSize == null) { if (other$queueSize != null) { return false; } } else if (!this$queueSize.equals(other$queueSize)) { return false; } label57: { Object this$coreThreadNum = this.getCoreThreadNum(); Object other$coreThreadNum = other.getCoreThreadNum(); if (this$coreThreadNum == null) { if (other$coreThreadNum == null) { break label57; } } else if (this$coreThreadNum.equals(other$coreThreadNum)) { break label57; } return false; } Object this$maxPoolSize = this.getMaxPoolSize(); Object other$maxPoolSize = other.getMaxPoolSize(); if (this$maxPoolSize == null) { if (other$maxPoolSize != null) { return false; } } else if (!this$maxPoolSize.equals(other$maxPoolSize)) { return false; } Object this$groupSize = this.getGroupSize(); Object other$groupSize = other.getGroupSize(); if (this$groupSize == null) { if (other$groupSize == null) { return true; } } else if (this$groupSize.equals(other$groupSize)) { return true; } return false; } } } protected boolean canEqual(Object other) { return other instanceof TheadPoolProperties; } public int hashCode() { int PRIME = true; int result = 1; Object $timeOut = this.getTimeOut(); int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode()); Object $queueSize = this.getQueueSize(); result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode()); Object $coreThreadNum = this.getCoreThreadNum(); result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode()); Object $maxPoolSize = this.getMaxPoolSize(); result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode()); Object $groupSize = this.getGroupSize(); result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode()); return result; } public String toString() { return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")"; } }
4. List splitting tool class
package com.visy.utils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.jar.Attributes; /** * 列表或数组按指定大小分组,用于批量取一部分数据循环处理 * */ public class ArraySplitUtil<T> { /** * 按指定大小对列表分组 * @param list * @param splitSize * @return */ public List<List<T>> splistList(List<T> list, int splitSize) { if (null == list || list.size() == 0) { return null; } int listSize = list.size(); List<List<T>> newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(list); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(list.subList(start, end)); } return newList; } /** * 按指定大小对数组分组 * @param array * @param splitSize * @return */ public List<T[]> splistArray(T[] array, int splitSize) { if (null == array) { return null; } int listSize = array.length; List<T[]> newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(array); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(Arrays.copyOfRange(array, start, end)); } return newList; } public static <E> ArraySplitUtil<E> build(){ return new ArraySplitUtil<>(); } }
5. Multi-task execution assistant class
package com.visy.helper; import com.baomidou.mybatisplus.toolkit.CollectionUtils; import com.google.common.collect.Lists; import com.visy.utils.ArraySplitUtil; import com.visy.threadpool.ThreadPoolConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /** * 多任务助手 * @author visy.wang * @date 2022/5/9 14:38 */ @Service public class MultiTaskHelper { @Autowired private ThreadPoolConfig threadPoolConfig; private static final Map<String,ArraySplitUtil<?>> ArraySplitUtilCache = new ConcurrentHashMap<>(); public <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<I,O> handler){ return createAndRunListTask(list, null, handler); } public <I,O> List<List<O>> createAndRunListTaskV2(List<I> list, Function<List<I>, List<O>> handler){ return createAndRunListTask(list, handler, null); } public <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<I> handler){ createAndRunListTaskWithoutReturn(list, null, handler); } public <I> void createAndRunListTaskWithoutReturnV2(List<I> list, Consumer<List<I>> handler){ createAndRunListTaskWithoutReturn(list, handler, null); } /** * 把列表按线程数分组 * @param list 列表 * @return 分组后的列表 */ @SuppressWarnings("unchecked") private <T> List<List<T>> listSplit(List<T> list){ String key = list.get(0).getClass().getName(); int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize(); ArraySplitUtil<T> arraySplitUtil = (ArraySplitUtil<T>)ArraySplitUtilCache.get(key); if(Objects.isNull(arraySplitUtil)){ arraySplitUtil = ArraySplitUtil.build(); ArraySplitUtilCache.put(key, arraySplitUtil); } return arraySplitUtil.splistList(list, groupSize); } /** * 创建并运行多任务 * @param list 输入数据列表 * @param handler1 处理器1 (优先级使用) * @param handler2 处理器2 * @param <I> 输入数据类型 * @param <O> 输出数据类型 * @return 执行结果分组列表 */ private <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<List<I>, List<O>> handler1, Function<I,O> handler2){ List<List<I>> listGroup = listSplit(list); //设定每个组的任务 List<Callable<List<O>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ return handler1.apply(subList); }else if(Objects.nonNull(handler2)){ return subList.stream().map(handler2).collect(Collectors.toList()); }else{ return null; } }); }); return threadPoolConfig.doConcurrentTask(taskList); } /** * 创建并运行多任务(无返回结果) * @param list 输入数据列表 * @param handler1 处理器1 (优先级更高) * @param handler2 处理器2 * @param <I> 输入数据类型 */ private <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<List<I>> handler1, Consumer<I> handler2){ List<List<I>> listGroup = listSplit(list); //设定每个组的任务 List<Callable<List<?>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ handler1.accept(subList); }else if(Objects.nonNull(handler2)){ subList.forEach(handler2); } return null; }); }); threadPoolConfig.doConcurrentTask(taskList); } }
6. Multi-task assistant usage:
@Autowired package com.zoom.fleet.schedule.service; import com.visy.helper.MultiTaskHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * 多任务助手使用示例 * @author visy.wang * @date 2022/5/13 14:11 */ @Service public class MultiTaskTest { @Autowired private MultiTaskHelper multiTaskHelper; private void test(){ //待多任务执行的数据列表 List<String> idList = new ArrayList<>(); //1.有返回结果的执行方式一, 定义单个数据的处理逻辑,返回多任务执行结果和合集 List<List<Long>> resultList = multiTaskHelper.createAndRunListTask(idList, id->{ //每一项数据的业务代码 return Long.valueOf(id); }); //2.有返回结果的执行方式二, 定义单个数线程的处理逻辑,返回多任务执行结果和合集 resultList = multiTaskHelper.createAndRunListTaskV2(idList, subIdList->{ //每一个线程下列表操作的业务代码 return subIdList.stream().map(id->{ //每一项数据的业务代码 return Long.valueOf(id); }).collect(Collectors.toList()); }); //3.无返回结果的执行方式一, 定义单个数据的处理逻辑 multiTaskHelper.createAndRunListTaskWithoutReturn(idList, id->{ //每一项数据的业务代码... }); //3.无返回结果的执行方式一, 定义单个数据的处理逻辑 multiTaskHelper.createAndRunListTaskWithoutReturnV2(idList, subIdList->{ subIdList.forEach(id->{ //每一项数据的业务代码... }); //继续操作subIdList... }); } }
The above is the detailed content of How to implement multi-threaded program using Java?. For more information, please follow other related articles on the PHP Chinese website!