ForkJoin Secara harfiah, Fork bermaksud bifurcation, dan Join bermaksud gabungan Kita boleh memahaminya sebagai menggabungkan tugasan besar Bahagikan kepada tugas kecil untuk pengiraan dan penyelesaian, dan akhirnya menggabungkan hasil tugasan kecil untuk mencari penyelesaian kepada tugas besar Kita boleh menyerahkan tugasan kecil yang dipecah ini kepada benang yang berbeza untuk pengiraan, yang merupakan prinsip pengkomputeran teragih. Ini serupa dengan pengkomputeran luar talian MapReduce dalam data besar Salah satu aplikasi ForkJoin yang paling klasik ialah Stream dalam Java8 Kami tahu bahawa Stream dibahagikan kepada aliran bersiri dan aliran selari. daripada.
Mari kita lihat inti yang paling banyak ForkJoinTask
dan ForkJoinPool
.
Kebergantungan ForkJoinTask itu sendiri tidak rumit Ia melaksanakan antara muka Masa Depan seperti pengiraan tugas tak segerak FutureTask
Mari'. kaji kod sumber teras ForkJoinTask dan cara tugasan dikira melalui kaedah bahagi dan takluk.
Inti ForkJoinTask ialah kaedah fork() dan join().
fork()
Tentukan sama ada utas semasa ialah utas ForkJoinWorkerThread
Ya Terus tolak urutan semasa ke dalam baris gilir kerja
Tidak memanggil kaedah externalPush ForkJoinPool
Bina statik dalam ForkJoinPool
Objek biasa common
dipanggil di sini externalPush()
join()
untuk memanggil kaedah doJoin() dan tunggu pelaksanaan utas selesai
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } // 获取结果的方法由子类实现 public abstract V getRawResult();
RecursiveTask ialah subkelas ForkJoinTask yang kebanyakannya melaksanakan kaedah mendapatkan hasil dan mengekang keputusan melalui generik. Jika kita perlu membuat tugasan sendiri, kita masih perlu melaksanakan RecursiveTask dan menulis kaedah pengiraan teras compute().
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { private static final long serialVersionUID = 5232453952276485270L; V result; protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; } }
Banyak fungsi dalam ForkJoinTask bergantung pada kumpulan utas ForkJoinPool, jadi ForkJoinTask tidak boleh berjalan tanpa ForkJoinPoolPool mempunyai banyak persamaan yang digunakan oleh ThreadPool tugas ForkJoinTask Saya juga telah memperkenalkan teknologi kumpulan benang dalam artikel sebelum ini Jika anda berminat, anda boleh membacanya - menganalisis prinsip pelaksanaan kumpulan benang (teknologi pengumpulan) daripada kod sumber java
ForkJoinPool Hubungan warisan dengan ThreadPoolExecutor adalah hampir sama, mereka bersamaan dengan saudara lelaki.
ForkJoinPool menggunakan algoritma mencuri kerja Jika urutan baharu dibuat untuk setiap subtugasan fork, ia akan menjejaskan sumber sistem adalah besar, jadi kolam benang mesti digunakan. Kumpulan utas am hanya mempunyai satu baris gilir tugas, tetapi untuk ForkJoinPool, memandangkan subtugasan yang dicabangkan oleh tugas yang sama adalah selari, untuk meningkatkan kecekapan dan mengurangkan persaingan utas, tugasan selari ini perlu diletakkan dalam baris gilir yang berbeza memproses tugasan yang berbeza pada kelajuan yang berbeza, mungkin terdapat utas yang telah selesai melaksanakan tugasan dalam baris gilirnya sendiri Pada masa ini, untuk meningkatkan kecekapan, utas boleh dibenarkan untuk "mencuri" daripada. baris gilir tugas lain Ini adalah apa yang dipanggil "algoritma mencuri kerja".
Untuk baris gilir umum, elemen yang memasuki baris gilir berada di penghujung baris gilir, dan elemen yang meninggalkan baris gilir berada di permulaan Untuk memenuhi keperluan "mencuri kerja", baris gilir tugasan harus menyokong penyahbarisan elemen daripada "tail" , yang boleh mengurangkan konflik dengan utas pekerja lain (kerana utas pekerja lain akan mendapat tugas dalam baris gilir tugas mereka sendiri daripada ketua pasukan dalam kes ini, anda perlu menggunakan penyekatan dua hujung beratur untuk menyelesaikannya.
Pertama sekali, mari kita lihat kaedah pembinaan kumpulan benang ForkJoinPool Ia memberikan kita tiga bentuk pembinaan, yang paling kompleks ialah pembinaan empat input Parameter Seterusnya kita Lihat apa yang diwakili oleh empat parameter inputnya?
paras selari int (tidak mewakili bilangan maksimum benang sedia ada)
ForkJoinWorkerThreadFactory kilang Kilang penciptaan benang
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
externalSubmit
是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。
signalWork
(ws, q)是发送工作信号,让工作队列进行运转。
public ForkJoinTask<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; } final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putOrderedInt(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } externalSubmit(task); } private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
提交任务后,通过signalWork
(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。
final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active if ((sp = (int)c) == 0) { // no idle workers if ((c & ADD_WORKER) != 0L) // too few workers tryAddWorker(c); break; } if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } } private void tryAddWorker(long c) { boolean add = false; do { long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; if (add) { createWorker(); break; } } } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); } private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); return true; } } catch (Throwable rex) { ex = rex; } deregisterWorker(wt, ex); return false; } final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array int idx = w.config & SMASK; int rs = lockRunState(); if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } for (;;) { // possibly replace WorkQueue[] ws; int m, sp; if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; if ((sp = (int)(c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); } public static interface ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool); } static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); } final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。
package com.zhj.interview; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class Test16 { public static void main(String[] args) throws ExecutionException, InterruptedException { int[] bigArr = new int[10000000]; for (int i = 0; i < 10000000; i++) { bigArr[i] = (int) (Math.random() * 10000000); } ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(bigArr); long start = System.currentTimeMillis(); forkJoinPool.submit(task).get(); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end-start)); } } class MyForkJoinTask extends RecursiveTask<int[]> { private int source[]; public MyForkJoinTask(int source[]) { if (source == null) { throw new RuntimeException("参数有误!!!"); } this.source = source; } @Override protected int[] compute() { int l = source.length; if (l < 2) { return Arrays.copyOf(source, l); } if (l == 2) { if (source[0] > source[1]) { int[] tar = new int[2]; tar[0] = source[1]; tar[1] = source[0]; return tar; } else { return Arrays.copyOf(source, l); } } if (l > 2) { int mid = l / 2; MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid)); task1.fork(); MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l)); task2.fork(); int[] res1 = task1.join(); int[] res2 = task2.join(); int tar[] = merge(res1, res2); return tar; } return null; } // 合并数组 private int[] merge(int[] res1, int[] res2) { int l1 = res1.length; int l2 = res2.length; int l = l1 + l2; int tar[] = new int[l]; for (int i = 0, i1 = 0, i2 = 0; i < l; i++) { int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1]; int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2]; // 如果条件成立,说明应该取数组array1中的值 if(v1 < v2) { tar[i] = v1; i1++; } else { tar[i] = v2; i2++; } } return tar; } }
通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。
Atas ialah kandungan terperinci Rangka kerja Fork/Join dan kaedah panggilan dalam Java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!