>  기사  >  Java  >  Fork/Join 프레임워크 및 Java의 호출 메소드

Fork/Join 프레임워크 및 Java의 호출 메소드

WBOY
WBOY앞으로
2023-05-08 11:10:071596검색

    ForkJoin이란?

    ForkJoin 말 그대로 Fork는 분기를 의미하고 Join은 결합을 의미하며, 계산과 해결을 위해 큰 작업을 작은 작업으로 나누고 최종적으로 작은 작업을 나누어서 결과를 합친다고 이해할 수 있습니다. 대규모 작업에 대한 솔루션을 찾기 위해 이러한 분열된 작은 작업을 다른 스레드에 넘겨 계산할 수 있습니다. 이것이 분산 컴퓨팅의 아이디어입니다. 이는 빅 데이터의 분산 오프라인 컴퓨팅 MapReduce와 유사합니다. ForkJoin의 가장 고전적인 응용 프로그램 중 하나는 Stream이 직렬 스트림과 병렬 스트림으로 구분된다는 것을 알고 있습니다. 병렬 스트림은 병렬 처리를 위해 ForkJoin을 사용합니다. 의.

    핵심 ForkJoinTaskForkJoinPool을 살펴보겠습니다. ForkJoinTaskForkJoinPool

    ForkJoinTask 任务

    ForkJoinTask本身的依赖关系并不复杂,它与异步任务计算FutureTask一样均实现了Future接口

    Fork/Join 프레임워크 및 Java의 호출 메소드

    下面我们就ForkJoinTask的核心源码来研究一下,该任务是如何通过分治法进行计算。

    ForkJoinTask最核心的莫过于fork()和join()方法了。

    fork()

    • 判断当前线程是不是ForkJoinWorkerThread线程

      • 是 直接将当前线程push到工作队列中

      • 否 调用ForkJoinPool 的externalPush方法

    ForkJoinPool构建了一个静态的common对象,这里调用的就是commonexternalPush()

    join()

    • 调用doJoin()方法,等待线程执行完成

        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    
        public final V join() {
            int s;
            if ((s = doJoin() & DONE_MASK) != NORMAL)
                reportException(s);
            return getRawResult();
        }
    
        private int doJoin() {
            int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
            return (s = status) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                tryUnpush(this) && (s = doExec()) < 0 ? s :
                wt.pool.awaitJoin(w, this, 0L) :
                externalAwaitDone();
        }
    
    	// 获取结果的方法由子类实现
    	public abstract V getRawResult();	

    RecursiveTask 是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。

    public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
        private static final long serialVersionUID = 5232453952276485270L;
    
        V result;
    
        protected abstract V compute();
    
        public final V getRawResult() {
            return result;
        }
    
        protected final void setRawResult(V value) {
            result = value;
        }
        protected final boolean exec() {
            result = compute();
            return true;
        }
    
    }

    ForkJoinPool 线程池

    ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读——从java源码分析线程池(池化技术)的实现原理

    ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。

    Fork/Join 프레임워크 및 Java의 호출 메소드

    工作窃取算法

    ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去“窃取”其它任务队列中的任务,这就是所谓的“工作窃取算法”。

    对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足“工作窃取”的需求,任务队列应该支持从“队尾”出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。

    构造方法

    首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么?

    • int parallelism 可并行级别(不代表最多存在的线程数量)

    • ForkJoinWorkerThreadFactory factory 线程创建工厂

    • UncaughtExceptionHandler handler 异常捕获处理器

    • boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式

        public ForkJoinPool() {
            this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
                 defaultForkJoinWorkerThreadFactory, null, false);
        }
    
    	public ForkJoinPool(int parallelism) {
            this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
        }
    
    	public ForkJoinPool(int parallelism,
                            ForkJoinWorkerThreadFactory factory,
                            UncaughtExceptionHandler handler,
                            boolean asyncMode) {
            this(checkParallelism(parallelism),
                 checkFactory(factory),
                 handler,
                 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
                 "ForkJoinPool-" + nextPoolId() + "-worker-");
            checkPermission();
        }

    提交方法

    下面我们看一下提交任务的方法:

    externalPush

    ForkJoinTask 작업🎜🎜ForkJoinTask 자체의 종속성은 복잡하지 않습니다. 이는 비동기 작업 계산 FutureTask🎜🎜Java에서 ForkJoin이란 무엇이며 어떻게 호출하나요?🎜🎜ForkJoinTask의 핵심 소스 코드와 분할 정복 방식을 통해 작업이 어떻게 계산되는지 살펴보겠습니다. 🎜🎜ForkJoinTask의 핵심은 fork()와 Join() 메소드입니다. 🎜🎜fork()🎜
    • 🎜현재 스레드가 ForkJoinWorkerThread 스레드인지 확인🎜
      • 🎜는 현재 스레드를 작업 대기열에 직접 푸시하는 것입니다. 🎜
      • 🎜 ForkJoinPool의 externalPush 메서드를 호출할지 여부는 🎜
      • 🎜🎜🎜 ForkJoinPool 여기서는 commonexternalPush()🎜🎜join()🎜
        • 🎜doJoin() 메서드를 호출하고 스레드 실행이 완료될 때까지 기다립니다🎜
        • 🎜
              public ForkJoinTask<?> submit(Runnable task) {
                  if (task == null)
                      throw new NullPointerException();
                  ForkJoinTask<?> job;
                  if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                      job = (ForkJoinTask<?>) task;
                  else
                      job = new ForkJoinTask.AdaptedRunnableAction(task);
                  externalPush(job);
                  return job;
              }
          
              final void externalPush(ForkJoinTask<?> task) {
                  WorkQueue[] ws; WorkQueue q; int m;
                  int r = ThreadLocalRandom.getProbe();
                  int rs = runState;
                  if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                      (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                      U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                      ForkJoinTask<?>[] a; int am, n, s;
                      if ((a = q.array) != null &&
                          (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                          int j = ((am & s) << ASHIFT) + ABASE;
                          U.putOrderedObject(a, j, task);
                          U.putOrderedInt(q, QTOP, s + 1);
                          U.putOrderedInt(q, QLOCK, 0);
                          if (n <= 1)
                              signalWork(ws, q);
                          return;
                      }
                      U.compareAndSwapInt(q, QLOCK, 1, 0);
                  }
                  externalSubmit(task);
              }
          
              private void externalSubmit(ForkJoinTask<?> task) {
                  int r;                                    // initialize caller&#39;s probe
                  if ((r = ThreadLocalRandom.getProbe()) == 0) {
                      ThreadLocalRandom.localInit();
                      r = ThreadLocalRandom.getProbe();
                  }
                  for (;;) {
                      WorkQueue[] ws; WorkQueue q; int rs, m, k;
                      boolean move = false;
                      if ((rs = runState) < 0) {
                          tryTerminate(false, false);     // help terminate
                          throw new RejectedExecutionException();
                      }
                      else if ((rs & STARTED) == 0 ||     // initialize
                               ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                          int ns = 0;
                          rs = lockRunState();
                          try {
                              if ((rs & STARTED) == 0) {
                                  U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                         new AtomicLong());
                                  // create workQueues array with size a power of two
                                  int p = config & SMASK; // ensure at least 2 slots
                                  int n = (p > 1) ? p - 1 : 1;
                                  n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                                  n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                                  workQueues = new WorkQueue[n];
                                  ns = STARTED;
                              }
                          } finally {
                              unlockRunState(rs, (rs & ~RSLOCK) | ns);
                          }
                      }
                      else if ((q = ws[k = r & m & SQMASK]) != null) {
                          if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                              ForkJoinTask<?>[] a = q.array;
                              int s = q.top;
                              boolean submitted = false; // initial submission or resizing
                              try {                      // locked version of push
                                  if ((a != null && a.length > s + 1 - q.base) ||
                                      (a = q.growArray()) != null) {
                                      int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                      U.putOrderedObject(a, j, task);
                                      U.putOrderedInt(q, QTOP, s + 1);
                                      submitted = true;
                                  }
                              } finally {
                                  U.compareAndSwapInt(q, QLOCK, 1, 0);
                              }
                              if (submitted) {
                                  signalWork(ws, q);
                                  return;
                              }
                          }
                          move = true;                   // move on failure
                      }
                      else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                          q = new WorkQueue(this, null);
                          q.hint = r;
                          q.config = k | SHARED_QUEUE;
                          q.scanState = INACTIVE;
                          rs = lockRunState();           // publish index
                          if (rs > 0 &&  (ws = workQueues) != null &&
                              k < ws.length && ws[k] == null)
                              ws[k] = q;                 // else terminated
                          unlockRunState(rs, rs & ~RSLOCK);
                      }
                      else
                          move = true;                   // move if busy
                      if (move)
                          r = ThreadLocalRandom.advanceProbe(r);
                  }
              }
          🎜RecursiveTask는 ForkJoinTask의 하위 클래스로 주로 결과를 얻는 메서드를 구현하고 제네릭을 통해 결과를 제한합니다. 작업을 직접 생성해야 하는 경우에도 RecursiveTask를 구현하고 핵심 계산 메서드인 Compute()를 작성해야 합니다. 🎜
              final void signalWork(WorkQueue[] ws, WorkQueue q) {
                  long c; int sp, i; WorkQueue v; Thread p;
                  while ((c = ctl) < 0L) {                       // too few active
                      if ((sp = (int)c) == 0) {                  // no idle workers
                          if ((c & ADD_WORKER) != 0L)            // too few workers
                              tryAddWorker(c);
                          break;
                      }
                      if (ws == null)                            // unstarted/terminated
                          break;
                      if (ws.length <= (i = sp & SMASK))         // terminated
                          break;
                      if ((v = ws[i]) == null)                   // terminating
                          break;
                      int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                      int d = sp - v.scanState;                  // screen CAS
                      long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                      if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                          v.scanState = vs;                      // activate v
                          if ((p = v.parker) != null)
                              U.unpark(p);
                          break;
                      }
                      if (q != null && q.base == q.top)          // no more work
                          break;
                  }
              }
          
              private void tryAddWorker(long c) {
                  boolean add = false;
                  do {
                      long nc = ((AC_MASK & (c + AC_UNIT)) |
                                 (TC_MASK & (c + TC_UNIT)));
                      if (ctl == c) {
                          int rs, stop;                 // check if terminating
                          if ((stop = (rs = lockRunState()) & STOP) == 0)
                              add = U.compareAndSwapLong(this, CTL, c, nc);
                          unlockRunState(rs, rs & ~RSLOCK);
                          if (stop != 0)
                              break;
                          if (add) {
                              createWorker();
                              break;
                          }
                      }
                  } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
              }
          
              private boolean createWorker() {
                  ForkJoinWorkerThreadFactory fac = factory;
                  Throwable ex = null;
                  ForkJoinWorkerThread wt = null;
                  try {
                      if (fac != null && (wt = fac.newThread(this)) != null) {
                          wt.start();
                          return true;
                      }
                  } catch (Throwable rex) {
                      ex = rex;
                  }
                  deregisterWorker(wt, ex);
                  return false;
              }
          
             final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
                  WorkQueue w = null;
                  if (wt != null && (w = wt.workQueue) != null) {
                      WorkQueue[] ws;                           // remove index from array
                      int idx = w.config & SMASK;
                      int rs = lockRunState();
                      if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                          ws[idx] = null;
                      unlockRunState(rs, rs & ~RSLOCK);
                  }
                  long c;                                       // decrement counts
                  do {} while (!U.compareAndSwapLong
                               (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                                     (TC_MASK & (c - TC_UNIT)) |
                                                     (SP_MASK & c))));
                  if (w != null) {
                      w.qlock = -1;                             // ensure set
                      w.transferStealCount(this);
                      w.cancelAll();                            // cancel remaining tasks
                  }
                  for (;;) {                                    // possibly replace
                      WorkQueue[] ws; int m, sp;
                      if (tryTerminate(false, false) || w == null || w.array == null ||
                          (runState & STOP) != 0 || (ws = workQueues) == null ||
                          (m = ws.length - 1) < 0)              // already terminating
                          break;
                      if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                          if (tryRelease(c, ws[sp & m], AC_UNIT))
                              break;
                      }
                      else if (ex != null && (c & ADD_WORKER) != 0L) {
                          tryAddWorker(c);                      // create replacement
                          break;
                      }
                      else                                      // don&#39;t need replacement
                          break;
                  }
                  if (ex == null)                               // help clean on way out
                      ForkJoinTask.helpExpungeStaleExceptions();
                  else                                          // rethrow
                      ForkJoinTask.rethrow(ex);
              }
          
              public static interface ForkJoinWorkerThreadFactory {
                  public ForkJoinWorkerThread newThread(ForkJoinPool pool);
              }
              static final class DefaultForkJoinWorkerThreadFactory
                  implements ForkJoinWorkerThreadFactory {
                  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                      return new ForkJoinWorkerThread(pool);
                  }
              }
              protected ForkJoinWorkerThread(ForkJoinPool pool) {
                  // Use a placeholder until a useful name can be set in registerWorker
                  super("aForkJoinWorkerThread");
                  this.pool = pool;
                  this.workQueue = pool.registerWorker(this);
              }
          
              final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
                  UncaughtExceptionHandler handler;
                  wt.setDaemon(true);                           // configure thread
                  if ((handler = ueh) != null)
                      wt.setUncaughtExceptionHandler(handler);
                  WorkQueue w = new WorkQueue(this, wt);
                  int i = 0;                                    // assign a pool index
                  int mode = config & MODE_MASK;
                  int rs = lockRunState();
                  try {
                      WorkQueue[] ws; int n;                    // skip if no array
                      if ((ws = workQueues) != null && (n = ws.length) > 0) {
                          int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                          int m = n - 1;
                          i = ((s << 1) | 1) & m;               // odd-numbered indices
                          if (ws[i] != null) {                  // collision
                              int probes = 0;                   // step by approx half n
                              int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                              while (ws[i = (i + step) & m] != null) {
                                  if (++probes >= n) {
                                      workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                      m = n - 1;
                                      probes = 0;
                                  }
                              }
                          }
                          w.hint = s;                           // use as random seed
                          w.config = i | mode;
                          w.scanState = i;                      // publication fence
                          ws[i] = w;
                      }
                  } finally {
                      unlockRunState(rs, rs & ~RSLOCK);
                  }
                  wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
                  return w;
              }
          🎜ForkJoinPool 스레드 풀🎜🎜ForkJoinTask의 많은 기능은 ForkJoinPool 스레드 풀에 의존하므로 ForkJoinPool은 ForkJoinPool 없이는 실행할 수 없습니다. ForkJoinPool은 ForkJoinTask 작업을 실행하는 데 특별히 사용되는 스레드 풀입니다. 스레드 풀 기술을 소개합니다. - Java 소스 코드에서 스레드 풀(풀링 기술)의 구현 원리를 분석해 보세요.🎜🎜ForkJoinPool과 ThreadPoolExecutor의 상속 관계는 거의 동일합니다. 형제관계. 🎜🎜Java에서 ForkJoin이 무엇이며 호출하는 방법🎜

          작업 훔치기 알고리즘

          🎜ForkJoinPool은 작업 훔치기 알고리즘을 사용합니다. 각 포크 하위 작업마다 새 스레드가 생성되면 시스템 리소스에 대한 오버헤드가 커지므로 스레드 풀을 사용해야 합니다. 일반적인 스레드 풀에는 작업 대기열이 하나만 있지만 ForkJoinPool의 경우 동일한 작업에 의해 분기된 하위 작업이 병렬이므로 효율성을 높이고 스레드 경쟁을 줄이기 위해 이러한 병렬 작업을 스레드마다 다른 대기열에 배치해야 합니다. 서로 다른 작업을 서로 다른 속도로 처리하는 경우, 자신의 대기열에서 작업 실행을 먼저 마친 스레드가 있을 수 있습니다. 이때 효율성을 높이기 위해 스레드에 "훔치기"를 허용할 수 있습니다. 다른 작업을 대기열에 추가하는 것을 소위 "작업 훔치기 알고리즘"이라고 합니다. 🎜🎜일반 대기열의 경우 대기열에 들어가는 요소는 대기열의 끝에 있고 대기열에서 나가는 요소는 시작 부분에 있습니다. "작업 도용" 요구 사항을 충족하려면 작업 대기열에서 " 다른 작업자 스레드와의 충돌을 줄일 수 있는 tail"(다른 작업자 스레드가 대기열 헤드에서 자신의 작업 대기열에 있는 작업을 가져오기 때문에)은 이중 종료 차단 대기열을 사용하여 해결해야 합니다. 🎜

          구성 방법

          🎜먼저 ForkJoinPool 스레드 풀의 구성 방법을 살펴보겠습니다. 이는 세 가지 구성 형식을 제공하며 그 중 가장 복잡한 것은 4개의 입력 매개 변수 구성입니다. 아래를 보세요. 네 가지 인삼은 무엇을 상징하나요? 🎜
          • 🎜int 병렬 처리 병렬 처리 수준(기존 스레드의 최대 수를 나타내지 않음)🎜
          • 🎜ForkJoinWorkerThreadFactory 팩토리 스레드 생성 팩토리🎜
          • 🎜UncaughtExceptionHandler 핸들러 예외 캡처 핸들러🎜
          • 🎜boolean asyncMode 선입선출 작업 모드 또는 후입선출 작업 모드🎜
          • 🎜
            package com.zhj.interview;
            
            import java.util.*;
            import java.util.concurrent.ExecutionException;
            import java.util.concurrent.ForkJoinPool;
            import java.util.concurrent.RecursiveTask;
            
            public class Test16 {
            
                public static void main(String[] args) throws ExecutionException, InterruptedException {
                    int[] bigArr = new int[10000000];
                    for (int i = 0; i < 10000000; i++) {
                        bigArr[i] = (int) (Math.random() * 10000000);
                    }
                    ForkJoinPool forkJoinPool = new ForkJoinPool();
                    MyForkJoinTask task = new MyForkJoinTask(bigArr);
                    long start = System.currentTimeMillis();
                    forkJoinPool.submit(task).get();
                    long end = System.currentTimeMillis();
                    System.out.println("耗时:" + (end-start));
            	}
            
            }
            class MyForkJoinTask extends RecursiveTask<int[]> {
            
                private int source[];
            
                public MyForkJoinTask(int source[]) {
                    if (source == null) {
                        throw new RuntimeException("参数有误!!!");
                    }
                    this.source = source;
                }
            
                @Override
                protected int[] compute() {
                    int l = source.length;
                    if (l < 2) {
                        return Arrays.copyOf(source, l);
                    }
                    if (l == 2) {
                        if (source[0] > source[1]) {
                            int[] tar = new int[2];
                            tar[0] = source[1];
                            tar[1] = source[0];
                            return tar;
                        } else {
                            return Arrays.copyOf(source, l);
                        }
                    }
                    if (l > 2) {
                        int mid = l / 2;
                        MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
                        task1.fork();
                        MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
                        task2.fork();
                        int[] res1 = task1.join();
                        int[] res2 = task2.join();
                        int tar[] = merge(res1, res2);
                        return tar;
                    }
                    return null;
                }
            	// 合并数组
                private int[] merge(int[] res1, int[] res2) {
                    int l1 = res1.length;
                    int l2 = res2.length;
                    int l = l1 + l2;
                    int tar[] = new int[l];
                    for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
                        int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
                        int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
                        // 如果条件成立,说明应该取数组array1中的值
                        if(v1 < v2) {
                            tar[i] = v1;
                            i1++;
                        } else {
                            tar[i] = v2;
                            i2++;
                        }
                    }
                    return tar;
                }
            }

            제출 방법

            🎜작업 제출 방법을 살펴보겠습니다. 🎜🎜externalPush 이 방법은 바로 포크할 때입니다. 현재 스레드가 ForkJoinWorkerThread가 아니고 새로 제출된 작업도 이 메서드를 통해 수행됩니다. 포크는 제출을 위한 새로운 하위 작업을 생성하는 것임을 알 수 있습니다. 🎜

            externalSubmit是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。

            signalWork(ws, q)是发送工作信号,让工作队列进行运转。

                public ForkJoinTask<?> submit(Runnable task) {
                    if (task == null)
                        throw new NullPointerException();
                    ForkJoinTask<?> job;
                    if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                        job = (ForkJoinTask<?>) task;
                    else
                        job = new ForkJoinTask.AdaptedRunnableAction(task);
                    externalPush(job);
                    return job;
                }
            
                final void externalPush(ForkJoinTask<?> task) {
                    WorkQueue[] ws; WorkQueue q; int m;
                    int r = ThreadLocalRandom.getProbe();
                    int rs = runState;
                    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                        ForkJoinTask<?>[] a; int am, n, s;
                        if ((a = q.array) != null &&
                            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                            int j = ((am & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                            U.putOrderedInt(q, QLOCK, 0);
                            if (n <= 1)
                                signalWork(ws, q);
                            return;
                        }
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                    externalSubmit(task);
                }
            
                private void externalSubmit(ForkJoinTask<?> task) {
                    int r;                                    // initialize caller&#39;s probe
                    if ((r = ThreadLocalRandom.getProbe()) == 0) {
                        ThreadLocalRandom.localInit();
                        r = ThreadLocalRandom.getProbe();
                    }
                    for (;;) {
                        WorkQueue[] ws; WorkQueue q; int rs, m, k;
                        boolean move = false;
                        if ((rs = runState) < 0) {
                            tryTerminate(false, false);     // help terminate
                            throw new RejectedExecutionException();
                        }
                        else if ((rs & STARTED) == 0 ||     // initialize
                                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                            int ns = 0;
                            rs = lockRunState();
                            try {
                                if ((rs & STARTED) == 0) {
                                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                           new AtomicLong());
                                    // create workQueues array with size a power of two
                                    int p = config & SMASK; // ensure at least 2 slots
                                    int n = (p > 1) ? p - 1 : 1;
                                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                                    workQueues = new WorkQueue[n];
                                    ns = STARTED;
                                }
                            } finally {
                                unlockRunState(rs, (rs & ~RSLOCK) | ns);
                            }
                        }
                        else if ((q = ws[k = r & m & SQMASK]) != null) {
                            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                                ForkJoinTask<?>[] a = q.array;
                                int s = q.top;
                                boolean submitted = false; // initial submission or resizing
                                try {                      // locked version of push
                                    if ((a != null && a.length > s + 1 - q.base) ||
                                        (a = q.growArray()) != null) {
                                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                        U.putOrderedObject(a, j, task);
                                        U.putOrderedInt(q, QTOP, s + 1);
                                        submitted = true;
                                    }
                                } finally {
                                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                                }
                                if (submitted) {
                                    signalWork(ws, q);
                                    return;
                                }
                            }
                            move = true;                   // move on failure
                        }
                        else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                            q = new WorkQueue(this, null);
                            q.hint = r;
                            q.config = k | SHARED_QUEUE;
                            q.scanState = INACTIVE;
                            rs = lockRunState();           // publish index
                            if (rs > 0 &&  (ws = workQueues) != null &&
                                k < ws.length && ws[k] == null)
                                ws[k] = q;                 // else terminated
                            unlockRunState(rs, rs & ~RSLOCK);
                        }
                        else
                            move = true;                   // move if busy
                        if (move)
                            r = ThreadLocalRandom.advanceProbe(r);
                    }
                }

            创建工人(线程)

            提交任务后,通过signalWork(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。

                final void signalWork(WorkQueue[] ws, WorkQueue q) {
                    long c; int sp, i; WorkQueue v; Thread p;
                    while ((c = ctl) < 0L) {                       // too few active
                        if ((sp = (int)c) == 0) {                  // no idle workers
                            if ((c & ADD_WORKER) != 0L)            // too few workers
                                tryAddWorker(c);
                            break;
                        }
                        if (ws == null)                            // unstarted/terminated
                            break;
                        if (ws.length <= (i = sp & SMASK))         // terminated
                            break;
                        if ((v = ws[i]) == null)                   // terminating
                            break;
                        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                        int d = sp - v.scanState;                  // screen CAS
                        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                            v.scanState = vs;                      // activate v
                            if ((p = v.parker) != null)
                                U.unpark(p);
                            break;
                        }
                        if (q != null && q.base == q.top)          // no more work
                            break;
                    }
                }
            
                private void tryAddWorker(long c) {
                    boolean add = false;
                    do {
                        long nc = ((AC_MASK & (c + AC_UNIT)) |
                                   (TC_MASK & (c + TC_UNIT)));
                        if (ctl == c) {
                            int rs, stop;                 // check if terminating
                            if ((stop = (rs = lockRunState()) & STOP) == 0)
                                add = U.compareAndSwapLong(this, CTL, c, nc);
                            unlockRunState(rs, rs & ~RSLOCK);
                            if (stop != 0)
                                break;
                            if (add) {
                                createWorker();
                                break;
                            }
                        }
                    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
                }
            
                private boolean createWorker() {
                    ForkJoinWorkerThreadFactory fac = factory;
                    Throwable ex = null;
                    ForkJoinWorkerThread wt = null;
                    try {
                        if (fac != null && (wt = fac.newThread(this)) != null) {
                            wt.start();
                            return true;
                        }
                    } catch (Throwable rex) {
                        ex = rex;
                    }
                    deregisterWorker(wt, ex);
                    return false;
                }
            
               final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
                    WorkQueue w = null;
                    if (wt != null && (w = wt.workQueue) != null) {
                        WorkQueue[] ws;                           // remove index from array
                        int idx = w.config & SMASK;
                        int rs = lockRunState();
                        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                            ws[idx] = null;
                        unlockRunState(rs, rs & ~RSLOCK);
                    }
                    long c;                                       // decrement counts
                    do {} while (!U.compareAndSwapLong
                                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                                       (TC_MASK & (c - TC_UNIT)) |
                                                       (SP_MASK & c))));
                    if (w != null) {
                        w.qlock = -1;                             // ensure set
                        w.transferStealCount(this);
                        w.cancelAll();                            // cancel remaining tasks
                    }
                    for (;;) {                                    // possibly replace
                        WorkQueue[] ws; int m, sp;
                        if (tryTerminate(false, false) || w == null || w.array == null ||
                            (runState & STOP) != 0 || (ws = workQueues) == null ||
                            (m = ws.length - 1) < 0)              // already terminating
                            break;
                        if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                            if (tryRelease(c, ws[sp & m], AC_UNIT))
                                break;
                        }
                        else if (ex != null && (c & ADD_WORKER) != 0L) {
                            tryAddWorker(c);                      // create replacement
                            break;
                        }
                        else                                      // don&#39;t need replacement
                            break;
                    }
                    if (ex == null)                               // help clean on way out
                        ForkJoinTask.helpExpungeStaleExceptions();
                    else                                          // rethrow
                        ForkJoinTask.rethrow(ex);
                }
            
                public static interface ForkJoinWorkerThreadFactory {
                    public ForkJoinWorkerThread newThread(ForkJoinPool pool);
                }
                static final class DefaultForkJoinWorkerThreadFactory
                    implements ForkJoinWorkerThreadFactory {
                    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                        return new ForkJoinWorkerThread(pool);
                    }
                }
                protected ForkJoinWorkerThread(ForkJoinPool pool) {
                    // Use a placeholder until a useful name can be set in registerWorker
                    super("aForkJoinWorkerThread");
                    this.pool = pool;
                    this.workQueue = pool.registerWorker(this);
                }
            
                final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
                    UncaughtExceptionHandler handler;
                    wt.setDaemon(true);                           // configure thread
                    if ((handler = ueh) != null)
                        wt.setUncaughtExceptionHandler(handler);
                    WorkQueue w = new WorkQueue(this, wt);
                    int i = 0;                                    // assign a pool index
                    int mode = config & MODE_MASK;
                    int rs = lockRunState();
                    try {
                        WorkQueue[] ws; int n;                    // skip if no array
                        if ((ws = workQueues) != null && (n = ws.length) > 0) {
                            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                            int m = n - 1;
                            i = ((s << 1) | 1) & m;               // odd-numbered indices
                            if (ws[i] != null) {                  // collision
                                int probes = 0;                   // step by approx half n
                                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                                while (ws[i = (i + step) & m] != null) {
                                    if (++probes >= n) {
                                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                        m = n - 1;
                                        probes = 0;
                                    }
                                }
                            }
                            w.hint = s;                           // use as random seed
                            w.config = i | mode;
                            w.scanState = i;                      // publication fence
                            ws[i] = w;
                        }
                    } finally {
                        unlockRunState(rs, rs & ~RSLOCK);
                    }
                    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
                    return w;
                }

            例:ForkJoinTask实现归并排序

            这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。

            package com.zhj.interview;
            
            import java.util.*;
            import java.util.concurrent.ExecutionException;
            import java.util.concurrent.ForkJoinPool;
            import java.util.concurrent.RecursiveTask;
            
            public class Test16 {
            
                public static void main(String[] args) throws ExecutionException, InterruptedException {
                    int[] bigArr = new int[10000000];
                    for (int i = 0; i < 10000000; i++) {
                        bigArr[i] = (int) (Math.random() * 10000000);
                    }
                    ForkJoinPool forkJoinPool = new ForkJoinPool();
                    MyForkJoinTask task = new MyForkJoinTask(bigArr);
                    long start = System.currentTimeMillis();
                    forkJoinPool.submit(task).get();
                    long end = System.currentTimeMillis();
                    System.out.println("耗时:" + (end-start));
            	}
            
            }
            class MyForkJoinTask extends RecursiveTask<int[]> {
            
                private int source[];
            
                public MyForkJoinTask(int source[]) {
                    if (source == null) {
                        throw new RuntimeException("参数有误!!!");
                    }
                    this.source = source;
                }
            
                @Override
                protected int[] compute() {
                    int l = source.length;
                    if (l < 2) {
                        return Arrays.copyOf(source, l);
                    }
                    if (l == 2) {
                        if (source[0] > source[1]) {
                            int[] tar = new int[2];
                            tar[0] = source[1];
                            tar[1] = source[0];
                            return tar;
                        } else {
                            return Arrays.copyOf(source, l);
                        }
                    }
                    if (l > 2) {
                        int mid = l / 2;
                        MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
                        task1.fork();
                        MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
                        task2.fork();
                        int[] res1 = task1.join();
                        int[] res2 = task2.join();
                        int tar[] = merge(res1, res2);
                        return tar;
                    }
                    return null;
                }
            	// 合并数组
                private int[] merge(int[] res1, int[] res2) {
                    int l1 = res1.length;
                    int l2 = res2.length;
                    int l = l1 + l2;
                    int tar[] = new int[l];
                    for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
                        int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
                        int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
                        // 如果条件成立,说明应该取数组array1中的值
                        if(v1 < v2) {
                            tar[i] = v1;
                            i1++;
                        } else {
                            tar[i] = v2;
                            i2++;
                        }
                    }
                    return tar;
                }
            }

            ForkJoin计算流程

            通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。

            Fork/Join 프레임워크 및 Java의 호출 메소드

    위 내용은 Fork/Join 프레임워크 및 Java의 호출 메소드의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제