1. マルチスレッド実行タスク クラス
package com.visy.threadpool; import com.visy.executor.ExecutorFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Configuration; @Configuration public class ThreadPoolConfig { private TheadPoolProperties theadPoolProperties; private ThreadPoolExecutor executor; private ThreadPoolExecutor executorChild; public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); } public <V> List<V> doConcurrentTask(List<Callable<V>> taskList, ThreadPoolExecutor... executorChilds) { if (taskList != null && !taskList.isEmpty()) { List<V> resultList = new ArrayList(); List futureList = null; try { if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) { throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size()); } if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) { futureList = executorChilds[0].invokeAll(taskList); } else { futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS); } } catch (InterruptedException var6) { var6.printStackTrace(); } this.doFutureList(resultList, futureList); return resultList; } else { return null; } } <V> void doFutureList(List<V> resultList, List<Future<V>> futureList) { if (futureList != null) { Iterator var3 = futureList.iterator(); while(var3.hasNext()) { Future future = (Future)var3.next(); try { resultList.add(future.get()); } catch (ExecutionException | InterruptedException var6) { var6.printStackTrace(); } } } } public <V> void doVoidConcurrentTask(List<Callable<V>> taskList) { if (taskList != null && !taskList.isEmpty()) { Iterator var2 = taskList.iterator(); while(var2.hasNext()) { Callable<V> call = (Callable)var2.next(); this.executor.submit(call); } } } public TheadPoolProperties getTheadPoolProperties() { return this.theadPoolProperties; } public ThreadPoolExecutor getExecutor() { return this.executor; } public ThreadPoolExecutor getExecutorChild() { return this.executorChild; } public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public void setExecutorChild(ThreadPoolExecutor executorChild) { this.executorChild = executorChild; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof ThreadPoolConfig)) { return false; } else { ThreadPoolConfig other = (ThreadPoolConfig)o; if (!other.canEqual(this)) { return false; } else { label47: { Object this$theadPoolProperties = this.getTheadPoolProperties(); Object other$theadPoolProperties = other.getTheadPoolProperties(); if (this$theadPoolProperties == null) { if (other$theadPoolProperties == null) { break label47; } } else if (this$theadPoolProperties.equals(other$theadPoolProperties)) { break label47; } return false; } Object this$executor = this.getExecutor(); Object other$executor = other.getExecutor(); if (this$executor == null) { if (other$executor != null) { return false; } } else if (!this$executor.equals(other$executor)) { return false; } Object this$executorChild = this.getExecutorChild(); Object other$executorChild = other.getExecutorChild(); if (this$executorChild == null) { if (other$executorChild != null) { return false; } } else if (!this$executorChild.equals(other$executorChild)) { return false; } return true; } } } protected boolean canEqual(Object other) { return other instanceof ThreadPoolConfig; } public int hashCode() { int PRIME = true; int result = 1; Object $theadPoolProperties = this.getTheadPoolProperties(); int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode()); Object $executor = this.getExecutor(); result = result * 59 + ($executor == null ? 43 : $executor.hashCode()); Object $executorChild = this.getExecutorChild(); result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode()); return result; } public String toString() { return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")"; } }
2. エグゼキューター ファクトリ クラス
package com.visy.executor; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ExecutorFactory { private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class); private static final Map<String, ThreadPoolExecutor> threadPoolExecutorMap = new ConcurrentHashMap(); private static final int DEFAULT_QUEUE_SIZE = 1000; private static final String DEFAULT_EXECUTOR_NAME = "default-executor"; private static final int MAX_THREAD_NUM = 100; private static final int CORE_THREAD_NUM = 1; private static volatile ExecutorFactory instance; private ExecutorFactory() { } public static ExecutorFactory getInstance() { if (instance == null) { Class var0 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (instance == null) { instance = new ExecutorFactory(); } } } return instance; } public ThreadPoolExecutor getThreadPoolExecutorByName(String name) { return (ThreadPoolExecutor)threadPoolExecutorMap.get(name); } public static Map<String, ThreadPoolExecutor> getThreadPoolExecutorMap() { return threadPoolExecutorMap; } public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) { if (StringUtils.isBlank(threadPoolExecutorName)) { throw new IllegalArgumentException("thread name empty"); } else { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { Class var5 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor(); threadPoolExecutorMap.put(threadPoolExecutorName, executor); logger.info("thread name: {} executor created", threadPoolExecutorName); } } } return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName); } } public <T extends Runnable> void submit(T t) { ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); defaultExecutor.submit(t); } public <T extends Runnable> void submit(String poolName, T t) { ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); if (executor == null) { logger.error("thread name: {} executor not exist.", poolName); throw new IllegalArgumentException("thread name:" + poolName + " executor not exist."); } else { executor.submit(t); } } public <T extends Callable<Object>> Future<Object> submit(T t) { ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); return defaultExecutor.submit(t); } public <T extends Callable<Object>> Future<Object> submit(String poolName, T t) { ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); if (executor == null) { logger.error("thread poolName: {} executor not exist.", poolName); throw new IllegalArgumentException("thread poolName:" + poolName + " executor not exist."); } else { return executor.submit(t); } } public ThreadPoolExecutor getThreadPoolExecutor() { return this.getThreadPoolExecutor("default-executor", 1000, 1, 100); } }
3. マルチスレッド構成クラス
package com.visy.threadpool; import javax.validation.constraints.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.validation.annotation.Validated; @Validated @Configuration @ConfigurationProperties(prefix = "visy.threadpool") public class TheadPoolProperties { // 执行并行任务时,等待多久时间超时(单位:秒) @NotNull private Integer timeOut; // 队列大小 @NotNull private Integer queueSize; // 核心线程数量 @NotNull private Integer coreThreadNum; // 线程池最大线程数量 @NotNull private Integer maxPoolSize; // 并行执行每组大小 private Integer groupSize = 20; public TheadPoolProperties() { } public Integer getTimeOut() { return this.timeOut; } public Integer getQueueSize() { return this.queueSize; } public Integer getCoreThreadNum() { return this.coreThreadNum; } public Integer getMaxPoolSize() { return this.maxPoolSize; } public Integer getGroupSize() { return this.groupSize; } public void setTimeOut(Integer timeOut) { this.timeOut = timeOut; } public void setQueueSize(Integer queueSize) { this.queueSize = queueSize; } public void setCoreThreadNum(Integer coreThreadNum) { this.coreThreadNum = coreThreadNum; } public void setMaxPoolSize(Integer maxPoolSize) { this.maxPoolSize = maxPoolSize; } public void setGroupSize(Integer groupSize) { this.groupSize = groupSize; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof TheadPoolProperties)) { return false; } else { TheadPoolProperties other = (TheadPoolProperties)o; if (!other.canEqual(this)) { return false; } else { label71: { Object this$timeOut = this.getTimeOut(); Object other$timeOut = other.getTimeOut(); if (this$timeOut == null) { if (other$timeOut == null) { break label71; } } else if (this$timeOut.equals(other$timeOut)) { break label71; } return false; } Object this$queueSize = this.getQueueSize(); Object other$queueSize = other.getQueueSize(); if (this$queueSize == null) { if (other$queueSize != null) { return false; } } else if (!this$queueSize.equals(other$queueSize)) { return false; } label57: { Object this$coreThreadNum = this.getCoreThreadNum(); Object other$coreThreadNum = other.getCoreThreadNum(); if (this$coreThreadNum == null) { if (other$coreThreadNum == null) { break label57; } } else if (this$coreThreadNum.equals(other$coreThreadNum)) { break label57; } return false; } Object this$maxPoolSize = this.getMaxPoolSize(); Object other$maxPoolSize = other.getMaxPoolSize(); if (this$maxPoolSize == null) { if (other$maxPoolSize != null) { return false; } } else if (!this$maxPoolSize.equals(other$maxPoolSize)) { return false; } Object this$groupSize = this.getGroupSize(); Object other$groupSize = other.getGroupSize(); if (this$groupSize == null) { if (other$groupSize == null) { return true; } } else if (this$groupSize.equals(other$groupSize)) { return true; } return false; } } } protected boolean canEqual(Object other) { return other instanceof TheadPoolProperties; } public int hashCode() { int PRIME = true; int result = 1; Object $timeOut = this.getTimeOut(); int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode()); Object $queueSize = this.getQueueSize(); result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode()); Object $coreThreadNum = this.getCoreThreadNum(); result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode()); Object $maxPoolSize = this.getMaxPoolSize(); result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode()); Object $groupSize = this.getGroupSize(); result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode()); return result; } public String toString() { return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")"; } }
4. リスト分割ツール クラス
package com.visy.utils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.jar.Attributes; /** * 列表或数组按指定大小分组,用于批量取一部分数据循环处理 * */ public class ArraySplitUtil<T> { /** * 按指定大小对列表分组 * @param list * @param splitSize * @return */ public List<List<T>> splistList(List<T> list, int splitSize) { if (null == list || list.size() == 0) { return null; } int listSize = list.size(); List<List<T>> newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(list); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(list.subList(start, end)); } return newList; } /** * 按指定大小对数组分组 * @param array * @param splitSize * @return */ public List<T[]> splistArray(T[] array, int splitSize) { if (null == array) { return null; } int listSize = array.length; List<T[]> newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(array); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(Arrays.copyOfRange(array, start, end)); } return newList; } public static <E> ArraySplitUtil<E> build(){ return new ArraySplitUtil<>(); } }
5. マルチタスク実行アシスタント クラス
package com.visy.helper; import com.baomidou.mybatisplus.toolkit.CollectionUtils; import com.google.common.collect.Lists; import com.visy.utils.ArraySplitUtil; import com.visy.threadpool.ThreadPoolConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /** * 多任务助手 * @author visy.wang * @date 2022/5/9 14:38 */ @Service public class MultiTaskHelper { @Autowired private ThreadPoolConfig threadPoolConfig; private static final Map<String,ArraySplitUtil<?>> ArraySplitUtilCache = new ConcurrentHashMap<>(); public <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<I,O> handler){ return createAndRunListTask(list, null, handler); } public <I,O> List<List<O>> createAndRunListTaskV2(List<I> list, Function<List<I>, List<O>> handler){ return createAndRunListTask(list, handler, null); } public <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<I> handler){ createAndRunListTaskWithoutReturn(list, null, handler); } public <I> void createAndRunListTaskWithoutReturnV2(List<I> list, Consumer<List<I>> handler){ createAndRunListTaskWithoutReturn(list, handler, null); } /** * 把列表按线程数分组 * @param list 列表 * @return 分组后的列表 */ @SuppressWarnings("unchecked") private <T> List<List<T>> listSplit(List<T> list){ String key = list.get(0).getClass().getName(); int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize(); ArraySplitUtil<T> arraySplitUtil = (ArraySplitUtil<T>)ArraySplitUtilCache.get(key); if(Objects.isNull(arraySplitUtil)){ arraySplitUtil = ArraySplitUtil.build(); ArraySplitUtilCache.put(key, arraySplitUtil); } return arraySplitUtil.splistList(list, groupSize); } /** * 创建并运行多任务 * @param list 输入数据列表 * @param handler1 处理器1 (优先级使用) * @param handler2 处理器2 * @param <I> 输入数据类型 * @param <O> 输出数据类型 * @return 执行结果分组列表 */ private <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<List<I>, List<O>> handler1, Function<I,O> handler2){ List<List<I>> listGroup = listSplit(list); //设定每个组的任务 List<Callable<List<O>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ return handler1.apply(subList); }else if(Objects.nonNull(handler2)){ return subList.stream().map(handler2).collect(Collectors.toList()); }else{ return null; } }); }); return threadPoolConfig.doConcurrentTask(taskList); } /** * 创建并运行多任务(无返回结果) * @param list 输入数据列表 * @param handler1 处理器1 (优先级更高) * @param handler2 处理器2 * @param <I> 输入数据类型 */ private <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<List<I>> handler1, Consumer<I> handler2){ List<List<I>> listGroup = listSplit(list); //设定每个组的任务 List<Callable<List<?>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ handler1.accept(subList); }else if(Objects.nonNull(handler2)){ subList.forEach(handler2); } return null; }); }); threadPoolConfig.doConcurrentTask(taskList); } }
6. マルチタスク アシスタントの使用法:
@Autowired package com.zoom.fleet.schedule.service; import com.visy.helper.MultiTaskHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * 多任务助手使用示例 * @author visy.wang * @date 2022/5/13 14:11 */ @Service public class MultiTaskTest { @Autowired private MultiTaskHelper multiTaskHelper; private void test(){ //待多任务执行的数据列表 List<String> idList = new ArrayList<>(); //1.有返回结果的执行方式一, 定义单个数据的处理逻辑,返回多任务执行结果和合集 List<List<Long>> resultList = multiTaskHelper.createAndRunListTask(idList, id->{ //每一项数据的业务代码 return Long.valueOf(id); }); //2.有返回结果的执行方式二, 定义单个数线程的处理逻辑,返回多任务执行结果和合集 resultList = multiTaskHelper.createAndRunListTaskV2(idList, subIdList->{ //每一个线程下列表操作的业务代码 return subIdList.stream().map(id->{ //每一项数据的业务代码 return Long.valueOf(id); }).collect(Collectors.toList()); }); //3.无返回结果的执行方式一, 定义单个数据的处理逻辑 multiTaskHelper.createAndRunListTaskWithoutReturn(idList, id->{ //每一项数据的业务代码... }); //3.无返回结果的执行方式一, 定义单个数据的处理逻辑 multiTaskHelper.createAndRunListTaskWithoutReturnV2(idList, subIdList->{ subIdList.forEach(id->{ //每一项数据的业务代码... }); //继续操作subIdList... }); } }
以上がJavaを使用してマルチスレッドプログラムを実装するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。