ForkJoin Littéralement, Fork signifie bifurcation, et Join signifie combinaison. Nous pouvons le comprendre comme diviser les grandes tâches en petites tâches pour le calcul et la solution, et enfin diviser les petites tâches en Les résultats sont combinés. pour trouver la solution à la grande tâche. Ces petites tâches fissurées peuvent être confiées à différents threads pour le calcul. Ceci est similaire à l'informatique distribuée hors ligne MapReduce dans le Big Data. L'une des applications les plus classiques de ForkJoin est Stream en Java8. Nous savons que Stream est divisé en flux série et flux parallèle. ParallelStream s'appuie sur ForkJoin pour réaliser un traitement parallèle. de.
Jetons un coup d'œil aux noyaux ForkJoinTask
et ForkJoinPool
. ForkJoinTask
和ForkJoinPool
。
ForkJoinTask本身的依赖关系并不复杂,它与异步任务计算FutureTask一样均实现了Future接口
下面我们就ForkJoinTask的核心源码来研究一下,该任务是如何通过分治法进行计算。
ForkJoinTask最核心的莫过于fork()和join()方法了。
fork()
判断当前线程是不是ForkJoinWorkerThread线程
是 直接将当前线程push到工作队列中
否 调用ForkJoinPool 的externalPush方法
在ForkJoinPool
构建了一个静态的common对象,这里调用的就是common
的externalPush()
join()
调用doJoin()方法,等待线程执行完成
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } // 获取结果的方法由子类实现 public abstract V getRawResult();
RecursiveTask 是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { private static final long serialVersionUID = 5232453952276485270L; V result; protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; } }
ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读——从java源码分析线程池(池化技术)的实现原理
ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。
ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去“窃取”其它任务队列中的任务,这就是所谓的“工作窃取算法”。
对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足“工作窃取”的需求,任务队列应该支持从“队尾”出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。
首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么?
int parallelism 可并行级别(不代表最多存在的线程数量)
ForkJoinWorkerThreadFactory factory 线程创建工厂
UncaughtExceptionHandler handler 异常捕获处理器
boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
下面我们看一下提交任务的方法:
externalPush
ForkJoinPool
Un objet commun statique est appelé ici la classe externalPush()
🎜🎜join()🎜public ForkJoinTask<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; } final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putOrderedInt(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } externalSubmit(task); } private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }🎜RecursiveTask est une sous-classe de ForkJoinTask qui implémente principalement la méthode d'obtention des résultats et contraint les résultats via des génériques. Si nous devons créer une tâche nous-mêmes, nous devons toujours implémenter RecursiveTask et écrire la méthode de calcul de base calculate(). 🎜
final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active if ((sp = (int)c) == 0) { // no idle workers if ((c & ADD_WORKER) != 0L) // too few workers tryAddWorker(c); break; } if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } } private void tryAddWorker(long c) { boolean add = false; do { long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; if (add) { createWorker(); break; } } } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); } private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); return true; } } catch (Throwable rex) { ex = rex; } deregisterWorker(wt, ex); return false; } final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array int idx = w.config & SMASK; int rs = lockRunState(); if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } for (;;) { // possibly replace WorkQueue[] ws; int m, sp; if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; if ((sp = (int)(c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); } public static interface ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool); } static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); } final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }🎜Pool de threads ForkJoinPool🎜🎜De nombreuses fonctions de ForkJoinTask dépendent du pool de threads ForkJoinPool, donc ForkJoinTask ne peut pas s'exécuter sans ForkJoinPool et présente de nombreuses similitudes avec ThreadPoolExecutor. introduisez la technologie du pool de threads. Si vous êtes intéressé, vous pouvez le lire - analyser le principe de mise en œuvre du pool de threads (technologie de pooling) à partir du code source Java🎜🎜La relation d'héritage entre ForkJoinPool et ThreadPoolExecutor est presque la même, ils sont assez similaires. relation fraternelle. 🎜🎜🎜
package com.zhj.interview; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class Test16 { public static void main(String[] args) throws ExecutionException, InterruptedException { int[] bigArr = new int[10000000]; for (int i = 0; i < 10000000; i++) { bigArr[i] = (int) (Math.random() * 10000000); } ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(bigArr); long start = System.currentTimeMillis(); forkJoinPool.submit(task).get(); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end-start)); } } class MyForkJoinTask extends RecursiveTask<int[]> { private int source[]; public MyForkJoinTask(int source[]) { if (source == null) { throw new RuntimeException("参数有误!!!"); } this.source = source; } @Override protected int[] compute() { int l = source.length; if (l < 2) { return Arrays.copyOf(source, l); } if (l == 2) { if (source[0] > source[1]) { int[] tar = new int[2]; tar[0] = source[1]; tar[1] = source[0]; return tar; } else { return Arrays.copyOf(source, l); } } if (l > 2) { int mid = l / 2; MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid)); task1.fork(); MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l)); task2.fork(); int[] res1 = task1.join(); int[] res2 = task2.join(); int tar[] = merge(res1, res2); return tar; } return null; } // 合并数组 private int[] merge(int[] res1, int[] res2) { int l1 = res1.length; int l2 = res2.length; int l = l1 + l2; int tar[] = new int[l]; for (int i = 0, i1 = 0, i2 = 0; i < l; i++) { int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1]; int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2]; // 如果条件成立,说明应该取数组array1中的值 if(v1 < v2) { tar[i] = v1; i1++; } else { tar[i] = v2; i2++; } } return tar; } }
externalPush
Cette méthode nous est très familière. C'est exactement lors du forking If. le thread actuel n'est pas ForkJoinWorkerThread, la nouvelle tâche soumise l'est également. La tâche sera effectuée via cette méthode. On peut voir que fork consiste à créer une nouvelle sous-tâche à soumettre. 🎜externalSubmit
是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。
signalWork
(ws, q)是发送工作信号,让工作队列进行运转。
public ForkJoinTask<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; } final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putOrderedInt(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } externalSubmit(task); } private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
提交任务后,通过signalWork
(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。
final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active if ((sp = (int)c) == 0) { // no idle workers if ((c & ADD_WORKER) != 0L) // too few workers tryAddWorker(c); break; } if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } } private void tryAddWorker(long c) { boolean add = false; do { long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; if (add) { createWorker(); break; } } } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); } private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); return true; } } catch (Throwable rex) { ex = rex; } deregisterWorker(wt, ex); return false; } final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array int idx = w.config & SMASK; int rs = lockRunState(); if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } for (;;) { // possibly replace WorkQueue[] ws; int m, sp; if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; if ((sp = (int)(c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); } public static interface ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool); } static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); } final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。
package com.zhj.interview; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class Test16 { public static void main(String[] args) throws ExecutionException, InterruptedException { int[] bigArr = new int[10000000]; for (int i = 0; i < 10000000; i++) { bigArr[i] = (int) (Math.random() * 10000000); } ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(bigArr); long start = System.currentTimeMillis(); forkJoinPool.submit(task).get(); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end-start)); } } class MyForkJoinTask extends RecursiveTask<int[]> { private int source[]; public MyForkJoinTask(int source[]) { if (source == null) { throw new RuntimeException("参数有误!!!"); } this.source = source; } @Override protected int[] compute() { int l = source.length; if (l < 2) { return Arrays.copyOf(source, l); } if (l == 2) { if (source[0] > source[1]) { int[] tar = new int[2]; tar[0] = source[1]; tar[1] = source[0]; return tar; } else { return Arrays.copyOf(source, l); } } if (l > 2) { int mid = l / 2; MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid)); task1.fork(); MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l)); task2.fork(); int[] res1 = task1.join(); int[] res2 = task2.join(); int tar[] = merge(res1, res2); return tar; } return null; } // 合并数组 private int[] merge(int[] res1, int[] res2) { int l1 = res1.length; int l2 = res2.length; int l = l1 + l2; int tar[] = new int[l]; for (int i = 0, i1 = 0, i2 = 0; i < l; i++) { int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1]; int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2]; // 如果条件成立,说明应该取数组array1中的值 if(v1 < v2) { tar[i] = v1; i1++; } else { tar[i] = v2; i2++; } } return tar; } }
通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!