Framework Fork/Join et méthode d'appel en Java

    Qu'est-ce que ForkJoin ?

    ForkJoin Littéralement, Fork signifie bifurcation, et Join signifie combinaison. Nous pouvons le comprendre comme diviser les grandes tâches en petites tâches pour le calcul et la solution, et enfin diviser les petites tâches en Les résultats sont combinés. pour trouver la solution à la grande tâche. Ces petites tâches fissurées peuvent être confiées à différents threads pour le calcul. Ceci est similaire à l'informatique distribuée hors ligne MapReduce dans le Big Data. L'une des applications les plus classiques de ForkJoin est Stream en Java8. Nous savons que Stream est divisé en flux série et flux parallèle. ParallelStream s'appuie sur ForkJoin pour réaliser un traitement parallèle. de.

    Jetons un coup d'œil aux noyaux ForkJoinTask et ForkJoinPool. ForkJoinTaskForkJoinPool

            public ForkJoinTask<?> submit(Runnable task) {
                if (task == null)
                    throw new NullPointerException();
                ForkJoinTask<?> job;
                if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                    job = (ForkJoinTask<?>) task;
                    job = new ForkJoinTask.AdaptedRunnableAction(task);
                return job;
            final void externalPush(ForkJoinTask<?> task) {
                WorkQueue[] ws; WorkQueue q; int m;
                int r = ThreadLocalRandom.getProbe();
                int rs = runState;
                if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                    (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                    U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a; int am, n, s;
                    if ((a = q.array) != null &&
                        (am = a.length - 1) > (n = (s = - q.base)) {
                        int j = ((am & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);
                        U.putOrderedInt(q, QTOP, s + 1);
                        U.putOrderedInt(q, QLOCK, 0);
                        if (n <= 1)
                            signalWork(ws, q);
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
            private void externalSubmit(ForkJoinTask<?> task) {
                int r;                                    // initialize caller&#39;s probe
                if ((r = ThreadLocalRandom.getProbe()) == 0) {
                    r = ThreadLocalRandom.getProbe();
                for (;;) {
                    WorkQueue[] ws; WorkQueue q; int rs, m, k;
                    boolean move = false;
                    if ((rs = runState) < 0) {
                        tryTerminate(false, false);     // help terminate
                        throw new RejectedExecutionException();
                    else if ((rs & STARTED) == 0 ||     // initialize
                             ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                        int ns = 0;
                        rs = lockRunState();
                        try {
                            if ((rs & STARTED) == 0) {
                                U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                       new AtomicLong());
                                // create workQueues array with size a power of two
                                int p = config & SMASK; // ensure at least 2 slots
                                int n = (p > 1) ? p - 1 : 1;
                                n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                                n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                                workQueues = new WorkQueue[n];
                                ns = STARTED;
                        } finally {
                            unlockRunState(rs, (rs & ~RSLOCK) | ns);
                    else if ((q = ws[k = r & m & SQMASK]) != null) {
                        if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                            ForkJoinTask<?>[] a = q.array;
                            int s =;
                            boolean submitted = false; // initial submission or resizing
                            try {                      // locked version of push
                                if ((a != null && a.length > s + 1 - q.base) ||
                                    (a = q.growArray()) != null) {
                                    int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                    U.putOrderedObject(a, j, task);
                                    U.putOrderedInt(q, QTOP, s + 1);
                                    submitted = true;
                            } finally {
                                U.compareAndSwapInt(q, QLOCK, 1, 0);
                            if (submitted) {
                                signalWork(ws, q);
                        move = true;                   // move on failure
                    else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                        q = new WorkQueue(this, null);
                        q.hint = r;
                        q.config = k | SHARED_QUEUE;
                        q.scanState = INACTIVE;
                        rs = lockRunState();           // publish index
                        if (rs > 0 &&  (ws = workQueues) != null &&
                            k < ws.length && ws[k] == null)
                            ws[k] = q;                 // else terminated
                        unlockRunState(rs, rs & ~RSLOCK);
                        move = true;                   // move if busy
                    if (move)
                        r = ThreadLocalRandom.advanceProbe(r);
        🎜RecursiveTask est une sous-classe de ForkJoinTask qui implémente principalement la méthode d'obtention des résultats et contraint les résultats via des génériques. Si nous devons créer une tâche nous-mêmes, nous devons toujours implémenter RecursiveTask et écrire la méthode de calcul de base calculate(). 🎜
            final void signalWork(WorkQueue[] ws, WorkQueue q) {
                long c; int sp, i; WorkQueue v; Thread p;
                while ((c = ctl) < 0L) {                       // too few active
                    if ((sp = (int)c) == 0) {                  // no idle workers
                        if ((c & ADD_WORKER) != 0L)            // too few workers
                    if (ws == null)                            // unstarted/terminated
                    if (ws.length <= (i = sp & SMASK))         // terminated
                    if ((v = ws[i]) == null)                   // terminating
                    int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                    int d = sp - v.scanState;                  // screen CAS
                    long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                    if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                        v.scanState = vs;                      // activate v
                        if ((p = v.parker) != null)
                    if (q != null && q.base ==          // no more work
            private void tryAddWorker(long c) {
                boolean add = false;
                do {
                    long nc = ((AC_MASK & (c + AC_UNIT)) |
                               (TC_MASK & (c + TC_UNIT)));
                    if (ctl == c) {
                        int rs, stop;                 // check if terminating
                        if ((stop = (rs = lockRunState()) & STOP) == 0)
                            add = U.compareAndSwapLong(this, CTL, c, nc);
                        unlockRunState(rs, rs & ~RSLOCK);
                        if (stop != 0)
                        if (add) {
                } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
            private boolean createWorker() {
                ForkJoinWorkerThreadFactory fac = factory;
                Throwable ex = null;
                ForkJoinWorkerThread wt = null;
                try {
                    if (fac != null && (wt = fac.newThread(this)) != null) {
                        return true;
                } catch (Throwable rex) {
                    ex = rex;
                deregisterWorker(wt, ex);
                return false;
           final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
                WorkQueue w = null;
                if (wt != null && (w = wt.workQueue) != null) {
                    WorkQueue[] ws;                           // remove index from array
                    int idx = w.config & SMASK;
                    int rs = lockRunState();
                    if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                        ws[idx] = null;
                    unlockRunState(rs, rs & ~RSLOCK);
                long c;                                       // decrement counts
                do {} while (!U.compareAndSwapLong
                             (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                                   (TC_MASK & (c - TC_UNIT)) |
                                                   (SP_MASK & c))));
                if (w != null) {
                    w.qlock = -1;                             // ensure set
                    w.cancelAll();                            // cancel remaining tasks
                for (;;) {                                    // possibly replace
                    WorkQueue[] ws; int m, sp;
                    if (tryTerminate(false, false) || w == null || w.array == null ||
                        (runState & STOP) != 0 || (ws = workQueues) == null ||
                        (m = ws.length - 1) < 0)              // already terminating
                    if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                        if (tryRelease(c, ws[sp & m], AC_UNIT))
                    else if (ex != null && (c & ADD_WORKER) != 0L) {
                        tryAddWorker(c);                      // create replacement
                    else                                      // don&#39;t need replacement
                if (ex == null)                               // help clean on way out
                else                                          // rethrow
            public static interface ForkJoinWorkerThreadFactory {
                public ForkJoinWorkerThread newThread(ForkJoinPool pool);
            static final class DefaultForkJoinWorkerThreadFactory
                implements ForkJoinWorkerThreadFactory {
                public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                    return new ForkJoinWorkerThread(pool);
            protected ForkJoinWorkerThread(ForkJoinPool pool) {
                // Use a placeholder until a useful name can be set in registerWorker
                this.pool = pool;
                this.workQueue = pool.registerWorker(this);
            final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
                UncaughtExceptionHandler handler;
                wt.setDaemon(true);                           // configure thread
                if ((handler = ueh) != null)
                WorkQueue w = new WorkQueue(this, wt);
                int i = 0;                                    // assign a pool index
                int mode = config & MODE_MASK;
                int rs = lockRunState();
                try {
                    WorkQueue[] ws; int n;                    // skip if no array
                    if ((ws = workQueues) != null && (n = ws.length) > 0) {
                        int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                        int m = n - 1;
                        i = ((s << 1) | 1) & m;               // odd-numbered indices
                        if (ws[i] != null) {                  // collision
                            int probes = 0;                   // step by approx half n
                            int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                            while (ws[i = (i + step) & m] != null) {
                                if (++probes >= n) {
                                    workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                    m = n - 1;
                                    probes = 0;
                        w.hint = s;                           // use as random seed
                        w.config = i | mode;
                        w.scanState = i;                      // publication fence
                        ws[i] = w;
                } finally {
                    unlockRunState(rs, rs & ~RSLOCK);
                wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
                return w;
        🎜Pool de threads ForkJoinPool🎜🎜De nombreuses fonctions de ForkJoinTask dépendent du pool de threads ForkJoinPool, donc ForkJoinTask ne peut pas s'exécuter sans ForkJoinPool et présente de nombreuses similitudes avec ThreadPoolExecutor. introduisez la technologie du pool de threads. Si vous êtes intéressé, vous pouvez le lire - analyser le principe de mise en œuvre du pool de threads (technologie de pooling) à partir du code source Java🎜🎜La relation d'héritage entre ForkJoinPool et ThreadPoolExecutor est presque la même, ils sont assez similaires. relation fraternelle. 🎜🎜Qu'est-ce que ForkJoin en Java et comment l'appeler🎜

        Algorithme de vol de travail

        🎜ForkJoinPool utilise un algorithme de vol de travail si un nouveau thread est créé pour chaque sous-tâche fork, la surcharge sur les ressources système sera énorme, un pool de threads doit donc être utilisé. Un pool de threads général n'a qu'une seule file d'attente de tâches, mais pour ForkJoinPool, puisque les sous-tâches exécutées par la même tâche sont parallèles, afin d'améliorer l'efficacité et de réduire la concurrence des threads, ces tâches parallèles doivent être placées dans des files d'attente différentes, car les threads. traiter différentes tâches à des vitesses différentes, il peut y avoir un thread qui a fini d'exécuter les tâches dans sa propre file d'attente en premier. À ce stade, afin d'améliorer l'efficacité, le thread peut être autorisé à « voler ». d'autres tâches dans la file d'attente, c'est ce qu'on appelle "Algorithme de vol de travail". 🎜🎜Pour les files d'attente générales, les éléments entrant dans la file d'attente sont à la fin de la file d'attente et les éléments quittant la file d'attente sont au début. Pour répondre aux besoins de « vol de travail », la file d'attente des tâches doit prendre en charge le retrait des éléments de la file d'attente. tail", qui peut réduire les conflits avec d'autres threads de travail (car les autres threads de travail obtiendront des tâches dans leurs propres files d'attente de tâches à partir de la tête de la file d'attente) doivent être résolus en utilisant une file d'attente de blocage à double extrémité. 🎜

        Méthode de construction

        🎜Tout d'abord, regardons la méthode de construction du pool de threads ForkJoinPool. Elle nous propose trois formes de construction, dont la plus complexe est la construction de quatre paramètres d'entrée. regardez-le ci-dessous. Que représentent les quatre ginsengs ? 🎜
        • 🎜niveau de parallélisme int (ne représente pas le nombre maximum de threads existants)🎜
        • 🎜ForkJoinWorkerThreadFactory usine de création de threads d'usine🎜
        • 🎜Gestionnaire de capture d'exceptions du gestionnaire UncaughtExceptionHandler🎜
        • 🎜mode de travail booléen asyncMode premier entré, premier sorti ou mode de travail dernier entré, premier sorti🎜
        • 🎜
          package com.zhj.interview;
          import java.util.*;
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.ForkJoinPool;
          import java.util.concurrent.RecursiveTask;
          public class Test16 {
              public static void main(String[] args) throws ExecutionException, InterruptedException {
                  int[] bigArr = new int[10000000];
                  for (int i = 0; i < 10000000; i++) {
                      bigArr[i] = (int) (Math.random() * 10000000);
                  ForkJoinPool forkJoinPool = new ForkJoinPool();
                  MyForkJoinTask task = new MyForkJoinTask(bigArr);
                  long start = System.currentTimeMillis();
                  long end = System.currentTimeMillis();
                  System.out.println("耗时:" + (end-start));
          class MyForkJoinTask extends RecursiveTask<int[]> {
              private int source[];
              public MyForkJoinTask(int source[]) {
                  if (source == null) {
                      throw new RuntimeException("参数有误!!!");
                  this.source = source;
              protected int[] compute() {
                  int l = source.length;
                  if (l < 2) {
                      return Arrays.copyOf(source, l);
                  if (l == 2) {
                      if (source[0] > source[1]) {
                          int[] tar = new int[2];
                          tar[0] = source[1];
                          tar[1] = source[0];
                          return tar;
                      } else {
                          return Arrays.copyOf(source, l);
                  if (l > 2) {
                      int mid = l / 2;
                      MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
                      MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
                      int[] res1 = task1.join();
                      int[] res2 = task2.join();
                      int tar[] = merge(res1, res2);
                      return tar;
                  return null;
          	// 合并数组
              private int[] merge(int[] res1, int[] res2) {
                  int l1 = res1.length;
                  int l2 = res2.length;
                  int l = l1 + l2;
                  int tar[] = new int[l];
                  for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
                      int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
                      int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
                      // 如果条件成立,说明应该取数组array1中的值
                      if(v1 < v2) {
                          tar[i] = v1;
                      } else {
                          tar[i] = v2;
                  return tar;

          Méthode de soumission

          🎜Jetons un coup d'œil à la méthode de soumission des tâches : 🎜🎜externalPush Cette méthode nous est très familière. C'est exactement lors du forking If. le thread actuel n'est pas ForkJoinWorkerThread, la nouvelle tâche soumise l'est également. La tâche sera effectuée via cette méthode. On peut voir que fork consiste à créer une nouvelle sous-tâche à soumettre. 🎜


          signalWork(ws, q)是发送工作信号,让工作队列进行运转。

              public ForkJoinTask<?> submit(Runnable task) {
                  if (task == null)
                      throw new NullPointerException();
                  ForkJoinTask<?> job;
                  if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                      job = (ForkJoinTask<?>) task;
                      job = new ForkJoinTask.AdaptedRunnableAction(task);
                  return job;
              final void externalPush(ForkJoinTask<?> task) {
                  WorkQueue[] ws; WorkQueue q; int m;
                  int r = ThreadLocalRandom.getProbe();
                  int rs = runState;
                  if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                      (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                      U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                      ForkJoinTask<?>[] a; int am, n, s;
                      if ((a = q.array) != null &&
                          (am = a.length - 1) > (n = (s = - q.base)) {
                          int j = ((am & s) << ASHIFT) + ABASE;
                          U.putOrderedObject(a, j, task);
                          U.putOrderedInt(q, QTOP, s + 1);
                          U.putOrderedInt(q, QLOCK, 0);
                          if (n <= 1)
                              signalWork(ws, q);
                      U.compareAndSwapInt(q, QLOCK, 1, 0);
              private void externalSubmit(ForkJoinTask<?> task) {
                  int r;                                    // initialize caller&#39;s probe
                  if ((r = ThreadLocalRandom.getProbe()) == 0) {
                      r = ThreadLocalRandom.getProbe();
                  for (;;) {
                      WorkQueue[] ws; WorkQueue q; int rs, m, k;
                      boolean move = false;
                      if ((rs = runState) < 0) {
                          tryTerminate(false, false);     // help terminate
                          throw new RejectedExecutionException();
                      else if ((rs & STARTED) == 0 ||     // initialize
                               ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                          int ns = 0;
                          rs = lockRunState();
                          try {
                              if ((rs & STARTED) == 0) {
                                  U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                         new AtomicLong());
                                  // create workQueues array with size a power of two
                                  int p = config & SMASK; // ensure at least 2 slots
                                  int n = (p > 1) ? p - 1 : 1;
                                  n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                                  n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                                  workQueues = new WorkQueue[n];
                                  ns = STARTED;
                          } finally {
                              unlockRunState(rs, (rs & ~RSLOCK) | ns);
                      else if ((q = ws[k = r & m & SQMASK]) != null) {
                          if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                              ForkJoinTask<?>[] a = q.array;
                              int s =;
                              boolean submitted = false; // initial submission or resizing
                              try {                      // locked version of push
                                  if ((a != null && a.length > s + 1 - q.base) ||
                                      (a = q.growArray()) != null) {
                                      int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                      U.putOrderedObject(a, j, task);
                                      U.putOrderedInt(q, QTOP, s + 1);
                                      submitted = true;
                              } finally {
                                  U.compareAndSwapInt(q, QLOCK, 1, 0);
                              if (submitted) {
                                  signalWork(ws, q);
                          move = true;                   // move on failure
                      else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                          q = new WorkQueue(this, null);
                          q.hint = r;
                          q.config = k | SHARED_QUEUE;
                          q.scanState = INACTIVE;
                          rs = lockRunState();           // publish index
                          if (rs > 0 &&  (ws = workQueues) != null &&
                              k < ws.length && ws[k] == null)
                              ws[k] = q;                 // else terminated
                          unlockRunState(rs, rs & ~RSLOCK);
                          move = true;                   // move if busy
                      if (move)
                          r = ThreadLocalRandom.advanceProbe(r);


          提交任务后,通过signalWork(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。

              final void signalWork(WorkQueue[] ws, WorkQueue q) {
                  long c; int sp, i; WorkQueue v; Thread p;
                  while ((c = ctl) < 0L) {                       // too few active
                      if ((sp = (int)c) == 0) {                  // no idle workers
                          if ((c & ADD_WORKER) != 0L)            // too few workers
                      if (ws == null)                            // unstarted/terminated
                      if (ws.length <= (i = sp & SMASK))         // terminated
                      if ((v = ws[i]) == null)                   // terminating
                      int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                      int d = sp - v.scanState;                  // screen CAS
                      long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                      if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                          v.scanState = vs;                      // activate v
                          if ((p = v.parker) != null)
                      if (q != null && q.base ==          // no more work
              private void tryAddWorker(long c) {
                  boolean add = false;
                  do {
                      long nc = ((AC_MASK & (c + AC_UNIT)) |
                                 (TC_MASK & (c + TC_UNIT)));
                      if (ctl == c) {
                          int rs, stop;                 // check if terminating
                          if ((stop = (rs = lockRunState()) & STOP) == 0)
                              add = U.compareAndSwapLong(this, CTL, c, nc);
                          unlockRunState(rs, rs & ~RSLOCK);
                          if (stop != 0)
                          if (add) {
                  } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
              private boolean createWorker() {
                  ForkJoinWorkerThreadFactory fac = factory;
                  Throwable ex = null;
                  ForkJoinWorkerThread wt = null;
                  try {
                      if (fac != null && (wt = fac.newThread(this)) != null) {
                          return true;
                  } catch (Throwable rex) {
                      ex = rex;
                  deregisterWorker(wt, ex);
                  return false;
             final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
                  WorkQueue w = null;
                  if (wt != null && (w = wt.workQueue) != null) {
                      WorkQueue[] ws;                           // remove index from array
                      int idx = w.config & SMASK;
                      int rs = lockRunState();
                      if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                          ws[idx] = null;
                      unlockRunState(rs, rs & ~RSLOCK);
                  long c;                                       // decrement counts
                  do {} while (!U.compareAndSwapLong
                               (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                                     (TC_MASK & (c - TC_UNIT)) |
                                                     (SP_MASK & c))));
                  if (w != null) {
                      w.qlock = -1;                             // ensure set
                      w.cancelAll();                            // cancel remaining tasks
                  for (;;) {                                    // possibly replace
                      WorkQueue[] ws; int m, sp;
                      if (tryTerminate(false, false) || w == null || w.array == null ||
                          (runState & STOP) != 0 || (ws = workQueues) == null ||
                          (m = ws.length - 1) < 0)              // already terminating
                      if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                          if (tryRelease(c, ws[sp & m], AC_UNIT))
                      else if (ex != null && (c & ADD_WORKER) != 0L) {
                          tryAddWorker(c);                      // create replacement
                      else                                      // don&#39;t need replacement
                  if (ex == null)                               // help clean on way out
                  else                                          // rethrow
              public static interface ForkJoinWorkerThreadFactory {
                  public ForkJoinWorkerThread newThread(ForkJoinPool pool);
              static final class DefaultForkJoinWorkerThreadFactory
                  implements ForkJoinWorkerThreadFactory {
                  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                      return new ForkJoinWorkerThread(pool);
              protected ForkJoinWorkerThread(ForkJoinPool pool) {
                  // Use a placeholder until a useful name can be set in registerWorker
                  this.pool = pool;
                  this.workQueue = pool.registerWorker(this);
              final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
                  UncaughtExceptionHandler handler;
                  wt.setDaemon(true);                           // configure thread
                  if ((handler = ueh) != null)
                  WorkQueue w = new WorkQueue(this, wt);
                  int i = 0;                                    // assign a pool index
                  int mode = config & MODE_MASK;
                  int rs = lockRunState();
                  try {
                      WorkQueue[] ws; int n;                    // skip if no array
                      if ((ws = workQueues) != null && (n = ws.length) > 0) {
                          int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                          int m = n - 1;
                          i = ((s << 1) | 1) & m;               // odd-numbered indices
                          if (ws[i] != null) {                  // collision
                              int probes = 0;                   // step by approx half n
                              int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                              while (ws[i = (i + step) & m] != null) {
                                  if (++probes >= n) {
                                      workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                      m = n - 1;
                                      probes = 0;
                          w.hint = s;                           // use as random seed
                          w.config = i | mode;
                          w.scanState = i;                      // publication fence
                          ws[i] = w;
                  } finally {
                      unlockRunState(rs, rs & ~RSLOCK);
                  wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
                  return w;



