Heim  >  Artikel  >  Java  >  Fork/Join-Framework und Aufrufmethode in Java

Fork/Join-Framework und Aufrufmethode in Java

WBOY
WBOYnach vorne
2023-05-08 11:10:071620Durchsuche

    Was ist ForkJoin?

    ForkJoin Wörtlich bedeutet Fork Gabelung und Join Kombination. Wir können es als Aufteilung verstehen Eine große Aufgabe in kleine Aufgaben zur Berechnung und Lösung aufteilen und schließlich die Ergebnisse der kleinen Aufgaben kombinieren, um die Lösung für die große Aufgabe zu finden. Wir können diese aufgeteilten kleinen Aufgaben zur Berechnung an verschiedene Threads übergeben verteiltes Rechnen. Dies ähnelt dem verteilten Offline-Computing MapReduce in Big Data. Eine der klassischsten Anwendungen von ForkJoin ist Stream in Java8. Wir wissen, dass der parallele Stream auf ForkJoin angewiesen ist, um eine parallele Verarbeitung zu erreichen. von.

    Werfen wir einen Blick auf den Kern ForkJoinTask und ForkJoinPool. ForkJoinTaskForkJoinPool

    ForkJoinTask 任务

    ForkJoinTask本身的依赖关系并不复杂,它与异步任务计算FutureTask一样均实现了Future接口

    Fork/Join-Framework und Aufrufmethode in Java

    下面我们就ForkJoinTask的核心源码来研究一下,该任务是如何通过分治法进行计算。

    ForkJoinTask最核心的莫过于fork()和join()方法了。

    fork()

    • 判断当前线程是不是ForkJoinWorkerThread线程

      • 是 直接将当前线程push到工作队列中

      • 否 调用ForkJoinPool 的externalPush方法

    ForkJoinPool构建了一个静态的common对象,这里调用的就是commonexternalPush()

    join()

    • 调用doJoin()方法,等待线程执行完成

        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    
        public final V join() {
            int s;
            if ((s = doJoin() & DONE_MASK) != NORMAL)
                reportException(s);
            return getRawResult();
        }
    
        private int doJoin() {
            int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
            return (s = status) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                tryUnpush(this) && (s = doExec()) < 0 ? s :
                wt.pool.awaitJoin(w, this, 0L) :
                externalAwaitDone();
        }
    
    	// 获取结果的方法由子类实现
    	public abstract V getRawResult();	

    RecursiveTask 是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。

    public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
        private static final long serialVersionUID = 5232453952276485270L;
    
        V result;
    
        protected abstract V compute();
    
        public final V getRawResult() {
            return result;
        }
    
        protected final void setRawResult(V value) {
            result = value;
        }
        protected final boolean exec() {
            result = compute();
            return true;
        }
    
    }

    ForkJoinPool 线程池

    ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读——从java源码分析线程池(池化技术)的实现原理

    ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。

    Fork/Join-Framework und Aufrufmethode in Java

    工作窃取算法

    ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去“窃取”其它任务队列中的任务,这就是所谓的“工作窃取算法”。

    对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足“工作窃取”的需求,任务队列应该支持从“队尾”出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。

    构造方法

    首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么?

    • int parallelism 可并行级别(不代表最多存在的线程数量)

    • ForkJoinWorkerThreadFactory factory 线程创建工厂

    • UncaughtExceptionHandler handler 异常捕获处理器

    • boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式

        public ForkJoinPool() {
            this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
                 defaultForkJoinWorkerThreadFactory, null, false);
        }
    
    	public ForkJoinPool(int parallelism) {
            this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
        }
    
    	public ForkJoinPool(int parallelism,
                            ForkJoinWorkerThreadFactory factory,
                            UncaughtExceptionHandler handler,
                            boolean asyncMode) {
            this(checkParallelism(parallelism),
                 checkFactory(factory),
                 handler,
                 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
                 "ForkJoinPool-" + nextPoolId() + "-worker-");
            checkPermission();
        }

    提交方法

    下面我们看一下提交任务的方法:

    externalPush

    ForkJoinTask Task #🎜🎜##🎜🎜#Die Abhängigkeiten von ForkJoinTask selbst sind nicht kompliziert, genau wie die asynchrone Aufgabenberechnung FutureTask #🎜🎜##🎜🎜#Was ist ForkJoin in Java und wie nennt man es#🎜🎜##🎜🎜# Im Folgenden untersuchen wir den Kernquellcode von ForkJoinTask und wie die Aufgabe mithilfe der Divide-and-Conquer-Methode berechnet wird. #🎜🎜##🎜🎜#Der Kern von ForkJoinTask sind die Methoden fork() und join(). #🎜🎜##🎜🎜#fork()#🎜🎜#
    • #🎜🎜#Beurteilen Sie, ob der aktuelle Thread ein ForkJoinWorkerThread-Thread ist #🎜 🎜#
      • #🎜🎜# besteht darin, den aktuellen Thread direkt in die Arbeitswarteschlange zu verschieben #🎜🎜#
      • # 🎜🎜# Ob die externalPush-Methode von ForkJoinPool aufgerufen werden soll#🎜🎜#
      • #🎜🎜##🎜🎜##🎜🎜#Ein statisches gemeinsames Objekt wird in ForkJoinPool erstellt und was ist das? Hier aufgerufen wird common'sexternalPush()#🎜🎜##🎜🎜#join()#🎜🎜#
        • #🎜🎜#Rufen Sie die doJoin()-Methode auf und warten Sie, bis die Thread-Ausführung abgeschlossen ist#🎜🎜#
        • #🎜🎜#
              public ForkJoinTask<?> submit(Runnable task) {
                  if (task == null)
                      throw new NullPointerException();
                  ForkJoinTask<?> job;
                  if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                      job = (ForkJoinTask<?>) task;
                  else
                      job = new ForkJoinTask.AdaptedRunnableAction(task);
                  externalPush(job);
                  return job;
              }
          
              final void externalPush(ForkJoinTask<?> task) {
                  WorkQueue[] ws; WorkQueue q; int m;
                  int r = ThreadLocalRandom.getProbe();
                  int rs = runState;
                  if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                      (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                      U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                      ForkJoinTask<?>[] a; int am, n, s;
                      if ((a = q.array) != null &&
                          (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                          int j = ((am & s) << ASHIFT) + ABASE;
                          U.putOrderedObject(a, j, task);
                          U.putOrderedInt(q, QTOP, s + 1);
                          U.putOrderedInt(q, QLOCK, 0);
                          if (n <= 1)
                              signalWork(ws, q);
                          return;
                      }
                      U.compareAndSwapInt(q, QLOCK, 1, 0);
                  }
                  externalSubmit(task);
              }
          
              private void externalSubmit(ForkJoinTask<?> task) {
                  int r;                                    // initialize caller&#39;s probe
                  if ((r = ThreadLocalRandom.getProbe()) == 0) {
                      ThreadLocalRandom.localInit();
                      r = ThreadLocalRandom.getProbe();
                  }
                  for (;;) {
                      WorkQueue[] ws; WorkQueue q; int rs, m, k;
                      boolean move = false;
                      if ((rs = runState) < 0) {
                          tryTerminate(false, false);     // help terminate
                          throw new RejectedExecutionException();
                      }
                      else if ((rs & STARTED) == 0 ||     // initialize
                               ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                          int ns = 0;
                          rs = lockRunState();
                          try {
                              if ((rs & STARTED) == 0) {
                                  U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                         new AtomicLong());
                                  // create workQueues array with size a power of two
                                  int p = config & SMASK; // ensure at least 2 slots
                                  int n = (p > 1) ? p - 1 : 1;
                                  n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                                  n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                                  workQueues = new WorkQueue[n];
                                  ns = STARTED;
                              }
                          } finally {
                              unlockRunState(rs, (rs & ~RSLOCK) | ns);
                          }
                      }
                      else if ((q = ws[k = r & m & SQMASK]) != null) {
                          if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                              ForkJoinTask<?>[] a = q.array;
                              int s = q.top;
                              boolean submitted = false; // initial submission or resizing
                              try {                      // locked version of push
                                  if ((a != null && a.length > s + 1 - q.base) ||
                                      (a = q.growArray()) != null) {
                                      int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                      U.putOrderedObject(a, j, task);
                                      U.putOrderedInt(q, QTOP, s + 1);
                                      submitted = true;
                                  }
                              } finally {
                                  U.compareAndSwapInt(q, QLOCK, 1, 0);
                              }
                              if (submitted) {
                                  signalWork(ws, q);
                                  return;
                              }
                          }
                          move = true;                   // move on failure
                      }
                      else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                          q = new WorkQueue(this, null);
                          q.hint = r;
                          q.config = k | SHARED_QUEUE;
                          q.scanState = INACTIVE;
                          rs = lockRunState();           // publish index
                          if (rs > 0 &&  (ws = workQueues) != null &&
                              k < ws.length && ws[k] == null)
                              ws[k] = q;                 // else terminated
                          unlockRunState(rs, rs & ~RSLOCK);
                      }
                      else
                          move = true;                   // move if busy
                      if (move)
                          r = ThreadLocalRandom.advanceProbe(r);
                  }
              }
          #🎜🎜#RecursiveTask ist eine Unterklasse von ForkJoinTask, die hauptsächlich die Methode zum Erhalten von Ergebnissen implementiert und die Ergebnisse durch Generika einschränkt. Wenn wir selbst eine Aufgabe erstellen müssen, müssen wir noch RecursiveTask implementieren und die Kernberechnungsmethode „compute()“ schreiben. #🎜🎜#
              final void signalWork(WorkQueue[] ws, WorkQueue q) {
                  long c; int sp, i; WorkQueue v; Thread p;
                  while ((c = ctl) < 0L) {                       // too few active
                      if ((sp = (int)c) == 0) {                  // no idle workers
                          if ((c & ADD_WORKER) != 0L)            // too few workers
                              tryAddWorker(c);
                          break;
                      }
                      if (ws == null)                            // unstarted/terminated
                          break;
                      if (ws.length <= (i = sp & SMASK))         // terminated
                          break;
                      if ((v = ws[i]) == null)                   // terminating
                          break;
                      int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                      int d = sp - v.scanState;                  // screen CAS
                      long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                      if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                          v.scanState = vs;                      // activate v
                          if ((p = v.parker) != null)
                              U.unpark(p);
                          break;
                      }
                      if (q != null && q.base == q.top)          // no more work
                          break;
                  }
              }
          
              private void tryAddWorker(long c) {
                  boolean add = false;
                  do {
                      long nc = ((AC_MASK & (c + AC_UNIT)) |
                                 (TC_MASK & (c + TC_UNIT)));
                      if (ctl == c) {
                          int rs, stop;                 // check if terminating
                          if ((stop = (rs = lockRunState()) & STOP) == 0)
                              add = U.compareAndSwapLong(this, CTL, c, nc);
                          unlockRunState(rs, rs & ~RSLOCK);
                          if (stop != 0)
                              break;
                          if (add) {
                              createWorker();
                              break;
                          }
                      }
                  } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
              }
          
              private boolean createWorker() {
                  ForkJoinWorkerThreadFactory fac = factory;
                  Throwable ex = null;
                  ForkJoinWorkerThread wt = null;
                  try {
                      if (fac != null && (wt = fac.newThread(this)) != null) {
                          wt.start();
                          return true;
                      }
                  } catch (Throwable rex) {
                      ex = rex;
                  }
                  deregisterWorker(wt, ex);
                  return false;
              }
          
             final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
                  WorkQueue w = null;
                  if (wt != null && (w = wt.workQueue) != null) {
                      WorkQueue[] ws;                           // remove index from array
                      int idx = w.config & SMASK;
                      int rs = lockRunState();
                      if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                          ws[idx] = null;
                      unlockRunState(rs, rs & ~RSLOCK);
                  }
                  long c;                                       // decrement counts
                  do {} while (!U.compareAndSwapLong
                               (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                                     (TC_MASK & (c - TC_UNIT)) |
                                                     (SP_MASK & c))));
                  if (w != null) {
                      w.qlock = -1;                             // ensure set
                      w.transferStealCount(this);
                      w.cancelAll();                            // cancel remaining tasks
                  }
                  for (;;) {                                    // possibly replace
                      WorkQueue[] ws; int m, sp;
                      if (tryTerminate(false, false) || w == null || w.array == null ||
                          (runState & STOP) != 0 || (ws = workQueues) == null ||
                          (m = ws.length - 1) < 0)              // already terminating
                          break;
                      if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                          if (tryRelease(c, ws[sp & m], AC_UNIT))
                              break;
                      }
                      else if (ex != null && (c & ADD_WORKER) != 0L) {
                          tryAddWorker(c);                      // create replacement
                          break;
                      }
                      else                                      // don&#39;t need replacement
                          break;
                  }
                  if (ex == null)                               // help clean on way out
                      ForkJoinTask.helpExpungeStaleExceptions();
                  else                                          // rethrow
                      ForkJoinTask.rethrow(ex);
              }
          
              public static interface ForkJoinWorkerThreadFactory {
                  public ForkJoinWorkerThread newThread(ForkJoinPool pool);
              }
              static final class DefaultForkJoinWorkerThreadFactory
                  implements ForkJoinWorkerThreadFactory {
                  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                      return new ForkJoinWorkerThread(pool);
                  }
              }
              protected ForkJoinWorkerThread(ForkJoinPool pool) {
                  // Use a placeholder until a useful name can be set in registerWorker
                  super("aForkJoinWorkerThread");
                  this.pool = pool;
                  this.workQueue = pool.registerWorker(this);
              }
          
              final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
                  UncaughtExceptionHandler handler;
                  wt.setDaemon(true);                           // configure thread
                  if ((handler = ueh) != null)
                      wt.setUncaughtExceptionHandler(handler);
                  WorkQueue w = new WorkQueue(this, wt);
                  int i = 0;                                    // assign a pool index
                  int mode = config & MODE_MASK;
                  int rs = lockRunState();
                  try {
                      WorkQueue[] ws; int n;                    // skip if no array
                      if ((ws = workQueues) != null && (n = ws.length) > 0) {
                          int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                          int m = n - 1;
                          i = ((s << 1) | 1) & m;               // odd-numbered indices
                          if (ws[i] != null) {                  // collision
                              int probes = 0;                   // step by approx half n
                              int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                              while (ws[i = (i + step) & m] != null) {
                                  if (++probes >= n) {
                                      workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                      m = n - 1;
                                      probes = 0;
                                  }
                              }
                          }
                          w.hint = s;                           // use as random seed
                          w.config = i | mode;
                          w.scanState = i;                      // publication fence
                          ws[i] = w;
                      }
                  } finally {
                      unlockRunState(rs, rs & ~RSLOCK);
                  }
                  wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
                  return w;
              }
          #🎜🎜#ForkJoinPool-Thread-Pool#🎜🎜##🎜🎜#Viele Funktionen in ForkJoinPool hängen vom ForkJoinPool-Thread-Pool ab, sodass ForkJoinTask nicht ohne ForkJoinPool ausgeführt werden kann. ForkJoinPool weist viele Ähnlichkeiten mit ThreadPoolExecutor auf Speziell zum Ausführen von ForkJoinTask-Aufgaben habe ich in früheren Artikeln auch die Thread-Pool-Technologie vorgestellt. Wenn Sie interessiert sind, können Sie das Implementierungsprinzip des Thread-Pools (Pooling-Technologie) anhand des Java-Quellcodes analysieren #Die Vererbungsbeziehung zwischen ForkJoinPool und ThreadPoolExecutor ist fast gleich, sie entspricht einer brüderlichen Beziehung. #🎜🎜##🎜🎜#Was ist ForkJoin in Java und wie wird es aufgerufen? #🎜🎜#

          Arbeit stehlender Algorithmus

          #🎜🎜#ForkJoinPool verwendet einen Arbeits stehlenden Algorithmus. Wenn für jede Fork-Unteraufgabe ein neuer Thread erstellt wird, ist der Overhead für die Systemressourcen enorm. , also Es muss ein Thread-Pool verwendet werden. Ein allgemeiner Thread-Pool hat nur eine Aufgabenwarteschlange, aber für ForkJoinPool müssen diese parallelen Aufgaben in unterschiedliche Warteschlangen gestellt werden, da die von derselben Aufgabe ausgegebenen Unteraufgaben parallel sind, um die Effizienz zu verbessern und die Thread-Konkurrenz zu verringern Um verschiedene Aufgaben mit unterschiedlichen Geschwindigkeiten zu verarbeiten, kann es sein, dass ein Thread die Ausführung der Aufgaben in seiner eigenen Warteschlange zuerst abgeschlossen hat. Um die Effizienz zu verbessern, kann dem Thread zu diesem Zeitpunkt erlaubt werden, „zu stehlen“. Bei anderen Aufgaben in der Warteschlange handelt es sich um den sogenannten „Work Stealing-Algorithmus“. #🎜🎜##🎜🎜#Bei allgemeinen Warteschlangen befinden sich die in die Warteschlange eintretenden Elemente am Ende der Warteschlange und die Elemente, die die Warteschlange verlassen, am Anfang. Um den Anforderungen des „Arbeitsdiebstahls“ gerecht zu werden, sollte die Aufgabenwarteschlange vorhanden sein Unterstützt das Verlassen des Teamelements „Ende der Warteschlange“. Dies kann Konflikte mit anderen Arbeitsthreads reduzieren (da andere Arbeitsthreads Aufgaben in ihren eigenen Aufgabenwarteschlangen vom Kopf des Teams erhalten). Um das Problem zu lösen, muss eine Blockierungswarteschlange verwendet werden. #🎜🎜#

          Konstruktionsmethode

          #🎜🎜#Schauen wir uns zunächst die Konstruktionsmethode des ForkJoinPool-Thread-Pools an. Sie bietet uns drei Konstruktionsformen, von denen die komplexeste die Konstruktion ist Werfen wir einen Blick darauf, was die vier Eingabeparameter darstellen. #🎜🎜#
          • #🎜🎜#int Parallelitätsgrad (stellt nicht die maximale Anzahl vorhandener Threads dar) #🎜🎜#
          • #🎜 🎜#ForkJoinWorkerThreadFactory Factory Thread-Erstellungsfabrik#🎜🎜#
          • #🎜🎜#UncaughtExceptionHandler handler Ausnahmeerfassungshandler#🎜🎜#
          • #🎜🎜#boolean asyncMode first-in first-out Arbeitsmodus oder „Last-In-First-Out“-Arbeitsmodus#🎜🎜#
          • #🎜🎜#
            package com.zhj.interview;
            
            import java.util.*;
            import java.util.concurrent.ExecutionException;
            import java.util.concurrent.ForkJoinPool;
            import java.util.concurrent.RecursiveTask;
            
            public class Test16 {
            
                public static void main(String[] args) throws ExecutionException, InterruptedException {
                    int[] bigArr = new int[10000000];
                    for (int i = 0; i < 10000000; i++) {
                        bigArr[i] = (int) (Math.random() * 10000000);
                    }
                    ForkJoinPool forkJoinPool = new ForkJoinPool();
                    MyForkJoinTask task = new MyForkJoinTask(bigArr);
                    long start = System.currentTimeMillis();
                    forkJoinPool.submit(task).get();
                    long end = System.currentTimeMillis();
                    System.out.println("耗时:" + (end-start));
            	}
            
            }
            class MyForkJoinTask extends RecursiveTask<int[]> {
            
                private int source[];
            
                public MyForkJoinTask(int source[]) {
                    if (source == null) {
                        throw new RuntimeException("参数有误!!!");
                    }
                    this.source = source;
                }
            
                @Override
                protected int[] compute() {
                    int l = source.length;
                    if (l < 2) {
                        return Arrays.copyOf(source, l);
                    }
                    if (l == 2) {
                        if (source[0] > source[1]) {
                            int[] tar = new int[2];
                            tar[0] = source[1];
                            tar[1] = source[0];
                            return tar;
                        } else {
                            return Arrays.copyOf(source, l);
                        }
                    }
                    if (l > 2) {
                        int mid = l / 2;
                        MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
                        task1.fork();
                        MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
                        task2.fork();
                        int[] res1 = task1.join();
                        int[] res2 = task2.join();
                        int tar[] = merge(res1, res2);
                        return tar;
                    }
                    return null;
                }
            	// 合并数组
                private int[] merge(int[] res1, int[] res2) {
                    int l1 = res1.length;
                    int l2 = res2.length;
                    int l = l1 + l2;
                    int tar[] = new int[l];
                    for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
                        int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
                        int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
                        // 如果条件成立,说明应该取数组array1中的值
                        if(v1 < v2) {
                            tar[i] = v1;
                            i1++;
                        } else {
                            tar[i] = v2;
                            i2++;
                        }
                    }
                    return tar;
                }
            }

            Einreichungsmethode

            #🎜🎜#Sehen wir uns an, wie man a einreicht Aufgabe: #🎜🎜##🎜🎜#externalPush Diese Methode ist uns sehr vertraut. Gerade wenn der aktuelle Thread nicht ForkJoinWorkerThread ist, wird dies auch für die neu übermittelte Aufgabe der Fall sein Führen Sie die Aufgabe über diese Methode aus. Es ist ersichtlich, dass Fork eine neue Unteraufgabe zur Übermittlung erstellen soll. #🎜🎜#

            externalSubmit是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。

            signalWork(ws, q)是发送工作信号,让工作队列进行运转。

                public ForkJoinTask<?> submit(Runnable task) {
                    if (task == null)
                        throw new NullPointerException();
                    ForkJoinTask<?> job;
                    if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                        job = (ForkJoinTask<?>) task;
                    else
                        job = new ForkJoinTask.AdaptedRunnableAction(task);
                    externalPush(job);
                    return job;
                }
            
                final void externalPush(ForkJoinTask<?> task) {
                    WorkQueue[] ws; WorkQueue q; int m;
                    int r = ThreadLocalRandom.getProbe();
                    int rs = runState;
                    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                        ForkJoinTask<?>[] a; int am, n, s;
                        if ((a = q.array) != null &&
                            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                            int j = ((am & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                            U.putOrderedInt(q, QLOCK, 0);
                            if (n <= 1)
                                signalWork(ws, q);
                            return;
                        }
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                    externalSubmit(task);
                }
            
                private void externalSubmit(ForkJoinTask<?> task) {
                    int r;                                    // initialize caller&#39;s probe
                    if ((r = ThreadLocalRandom.getProbe()) == 0) {
                        ThreadLocalRandom.localInit();
                        r = ThreadLocalRandom.getProbe();
                    }
                    for (;;) {
                        WorkQueue[] ws; WorkQueue q; int rs, m, k;
                        boolean move = false;
                        if ((rs = runState) < 0) {
                            tryTerminate(false, false);     // help terminate
                            throw new RejectedExecutionException();
                        }
                        else if ((rs & STARTED) == 0 ||     // initialize
                                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                            int ns = 0;
                            rs = lockRunState();
                            try {
                                if ((rs & STARTED) == 0) {
                                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                           new AtomicLong());
                                    // create workQueues array with size a power of two
                                    int p = config & SMASK; // ensure at least 2 slots
                                    int n = (p > 1) ? p - 1 : 1;
                                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                                    workQueues = new WorkQueue[n];
                                    ns = STARTED;
                                }
                            } finally {
                                unlockRunState(rs, (rs & ~RSLOCK) | ns);
                            }
                        }
                        else if ((q = ws[k = r & m & SQMASK]) != null) {
                            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                                ForkJoinTask<?>[] a = q.array;
                                int s = q.top;
                                boolean submitted = false; // initial submission or resizing
                                try {                      // locked version of push
                                    if ((a != null && a.length > s + 1 - q.base) ||
                                        (a = q.growArray()) != null) {
                                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                        U.putOrderedObject(a, j, task);
                                        U.putOrderedInt(q, QTOP, s + 1);
                                        submitted = true;
                                    }
                                } finally {
                                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                                }
                                if (submitted) {
                                    signalWork(ws, q);
                                    return;
                                }
                            }
                            move = true;                   // move on failure
                        }
                        else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                            q = new WorkQueue(this, null);
                            q.hint = r;
                            q.config = k | SHARED_QUEUE;
                            q.scanState = INACTIVE;
                            rs = lockRunState();           // publish index
                            if (rs > 0 &&  (ws = workQueues) != null &&
                                k < ws.length && ws[k] == null)
                                ws[k] = q;                 // else terminated
                            unlockRunState(rs, rs & ~RSLOCK);
                        }
                        else
                            move = true;                   // move if busy
                        if (move)
                            r = ThreadLocalRandom.advanceProbe(r);
                    }
                }

            创建工人(线程)

            提交任务后,通过signalWork(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。

                final void signalWork(WorkQueue[] ws, WorkQueue q) {
                    long c; int sp, i; WorkQueue v; Thread p;
                    while ((c = ctl) < 0L) {                       // too few active
                        if ((sp = (int)c) == 0) {                  // no idle workers
                            if ((c & ADD_WORKER) != 0L)            // too few workers
                                tryAddWorker(c);
                            break;
                        }
                        if (ws == null)                            // unstarted/terminated
                            break;
                        if (ws.length <= (i = sp & SMASK))         // terminated
                            break;
                        if ((v = ws[i]) == null)                   // terminating
                            break;
                        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                        int d = sp - v.scanState;                  // screen CAS
                        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                            v.scanState = vs;                      // activate v
                            if ((p = v.parker) != null)
                                U.unpark(p);
                            break;
                        }
                        if (q != null && q.base == q.top)          // no more work
                            break;
                    }
                }
            
                private void tryAddWorker(long c) {
                    boolean add = false;
                    do {
                        long nc = ((AC_MASK & (c + AC_UNIT)) |
                                   (TC_MASK & (c + TC_UNIT)));
                        if (ctl == c) {
                            int rs, stop;                 // check if terminating
                            if ((stop = (rs = lockRunState()) & STOP) == 0)
                                add = U.compareAndSwapLong(this, CTL, c, nc);
                            unlockRunState(rs, rs & ~RSLOCK);
                            if (stop != 0)
                                break;
                            if (add) {
                                createWorker();
                                break;
                            }
                        }
                    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
                }
            
                private boolean createWorker() {
                    ForkJoinWorkerThreadFactory fac = factory;
                    Throwable ex = null;
                    ForkJoinWorkerThread wt = null;
                    try {
                        if (fac != null && (wt = fac.newThread(this)) != null) {
                            wt.start();
                            return true;
                        }
                    } catch (Throwable rex) {
                        ex = rex;
                    }
                    deregisterWorker(wt, ex);
                    return false;
                }
            
               final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
                    WorkQueue w = null;
                    if (wt != null && (w = wt.workQueue) != null) {
                        WorkQueue[] ws;                           // remove index from array
                        int idx = w.config & SMASK;
                        int rs = lockRunState();
                        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                            ws[idx] = null;
                        unlockRunState(rs, rs & ~RSLOCK);
                    }
                    long c;                                       // decrement counts
                    do {} while (!U.compareAndSwapLong
                                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                                       (TC_MASK & (c - TC_UNIT)) |
                                                       (SP_MASK & c))));
                    if (w != null) {
                        w.qlock = -1;                             // ensure set
                        w.transferStealCount(this);
                        w.cancelAll();                            // cancel remaining tasks
                    }
                    for (;;) {                                    // possibly replace
                        WorkQueue[] ws; int m, sp;
                        if (tryTerminate(false, false) || w == null || w.array == null ||
                            (runState & STOP) != 0 || (ws = workQueues) == null ||
                            (m = ws.length - 1) < 0)              // already terminating
                            break;
                        if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                            if (tryRelease(c, ws[sp & m], AC_UNIT))
                                break;
                        }
                        else if (ex != null && (c & ADD_WORKER) != 0L) {
                            tryAddWorker(c);                      // create replacement
                            break;
                        }
                        else                                      // don&#39;t need replacement
                            break;
                    }
                    if (ex == null)                               // help clean on way out
                        ForkJoinTask.helpExpungeStaleExceptions();
                    else                                          // rethrow
                        ForkJoinTask.rethrow(ex);
                }
            
                public static interface ForkJoinWorkerThreadFactory {
                    public ForkJoinWorkerThread newThread(ForkJoinPool pool);
                }
                static final class DefaultForkJoinWorkerThreadFactory
                    implements ForkJoinWorkerThreadFactory {
                    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                        return new ForkJoinWorkerThread(pool);
                    }
                }
                protected ForkJoinWorkerThread(ForkJoinPool pool) {
                    // Use a placeholder until a useful name can be set in registerWorker
                    super("aForkJoinWorkerThread");
                    this.pool = pool;
                    this.workQueue = pool.registerWorker(this);
                }
            
                final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
                    UncaughtExceptionHandler handler;
                    wt.setDaemon(true);                           // configure thread
                    if ((handler = ueh) != null)
                        wt.setUncaughtExceptionHandler(handler);
                    WorkQueue w = new WorkQueue(this, wt);
                    int i = 0;                                    // assign a pool index
                    int mode = config & MODE_MASK;
                    int rs = lockRunState();
                    try {
                        WorkQueue[] ws; int n;                    // skip if no array
                        if ((ws = workQueues) != null && (n = ws.length) > 0) {
                            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                            int m = n - 1;
                            i = ((s << 1) | 1) & m;               // odd-numbered indices
                            if (ws[i] != null) {                  // collision
                                int probes = 0;                   // step by approx half n
                                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                                while (ws[i = (i + step) & m] != null) {
                                    if (++probes >= n) {
                                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                        m = n - 1;
                                        probes = 0;
                                    }
                                }
                            }
                            w.hint = s;                           // use as random seed
                            w.config = i | mode;
                            w.scanState = i;                      // publication fence
                            ws[i] = w;
                        }
                    } finally {
                        unlockRunState(rs, rs & ~RSLOCK);
                    }
                    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
                    return w;
                }

            例:ForkJoinTask实现归并排序

            这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。

            package com.zhj.interview;
            
            import java.util.*;
            import java.util.concurrent.ExecutionException;
            import java.util.concurrent.ForkJoinPool;
            import java.util.concurrent.RecursiveTask;
            
            public class Test16 {
            
                public static void main(String[] args) throws ExecutionException, InterruptedException {
                    int[] bigArr = new int[10000000];
                    for (int i = 0; i < 10000000; i++) {
                        bigArr[i] = (int) (Math.random() * 10000000);
                    }
                    ForkJoinPool forkJoinPool = new ForkJoinPool();
                    MyForkJoinTask task = new MyForkJoinTask(bigArr);
                    long start = System.currentTimeMillis();
                    forkJoinPool.submit(task).get();
                    long end = System.currentTimeMillis();
                    System.out.println("耗时:" + (end-start));
            	}
            
            }
            class MyForkJoinTask extends RecursiveTask<int[]> {
            
                private int source[];
            
                public MyForkJoinTask(int source[]) {
                    if (source == null) {
                        throw new RuntimeException("参数有误!!!");
                    }
                    this.source = source;
                }
            
                @Override
                protected int[] compute() {
                    int l = source.length;
                    if (l < 2) {
                        return Arrays.copyOf(source, l);
                    }
                    if (l == 2) {
                        if (source[0] > source[1]) {
                            int[] tar = new int[2];
                            tar[0] = source[1];
                            tar[1] = source[0];
                            return tar;
                        } else {
                            return Arrays.copyOf(source, l);
                        }
                    }
                    if (l > 2) {
                        int mid = l / 2;
                        MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
                        task1.fork();
                        MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
                        task2.fork();
                        int[] res1 = task1.join();
                        int[] res2 = task2.join();
                        int tar[] = merge(res1, res2);
                        return tar;
                    }
                    return null;
                }
            	// 合并数组
                private int[] merge(int[] res1, int[] res2) {
                    int l1 = res1.length;
                    int l2 = res2.length;
                    int l = l1 + l2;
                    int tar[] = new int[l];
                    for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
                        int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
                        int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
                        // 如果条件成立,说明应该取数组array1中的值
                        if(v1 < v2) {
                            tar[i] = v1;
                            i1++;
                        } else {
                            tar[i] = v2;
                            i2++;
                        }
                    }
                    return tar;
                }
            }

            ForkJoin计算流程

            通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。

            Fork/Join-Framework und Aufrufmethode in Java

    Das obige ist der detaillierte Inhalt vonFork/Join-Framework und Aufrufmethode in Java. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen