>  기사  >  Java  >  Java의 ThreadPoolExecutor 원리에 대한 자세한 분석(코드 포함)

Java의 ThreadPoolExecutor 원리에 대한 자세한 분석(코드 포함)

黄舟
黄舟원래의
2017-03-29 10:31:221771검색

본 글은 주로 Java의 ThreadPoolExecutor 원리 분석 관련 정보를 소개하고 있으며, 필요하신 분들은

Java의 ThreadPoolExecutor 원리 분석

을 참고하시기 바랍니다. 🎜> Thread Pool 소개

Java Thread Pool은 개발 시 흔히 사용하는 도구로, 비동기식, 병렬 작업을 처리할 때 자주 사용됩니다. 또는 서버를 구현할 때

연결 처리 요청을 받기 위해 스레드 풀도 사용해야 합니다.

스레드 풀은

을 사용합니다. JDK에서 제공하는 스레드 풀 구현은

java.util.concurrent.ThreadPoolExecutor에 있습니다. 사용 시 일반적으로 submit, InvokeAll, shutdown 등의 공통 메소드를 제공하는 ExecutorService인터페이스를 사용한다.

스레드 풀 구성 측면에서 Executors 클래스는

newFixedThreadPool, newCachedThreadPool, newSingleThreadExecutor정적 메서드를 제공합니다. 🎜> 등, 이러한 메서드는 결국 ThreadPoolExecutor생성자를 호출합니다. 모든 매개변수를 포함하는 ThreadPoolExecutor의 생성자는

/**
   * @param corePoolSize the number of threads to keep in the pool, even
   *    if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *    pool
   * @param keepAliveTime when the number of threads is greater than
   *    the core, this is the maximum time that excess idle threads
   *    will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *    executed. This queue will hold only the {@code Runnable}
   *    tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *    creates a new thread
   * @param handler the handler to use when execution is blocked
   *    because the thread bounds and queue capacities are reached
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
  }

    corePoolSize입니다. 새 작업을 추가할 때 스레드 풀의 코어 스레드 수를 설정합니다. 스레드 풀의 스레드 corePoolSize보다 작으면 현재 유휴 스레드가 있는지 여부에 관계없이 작업을 수행하기 위해 새 스레드가 생성됩니다.
  • maximunPoolSize는 스레드 풀에 허용되는 최대 스레드 수입니다.
  • workQueue는 대기 중인 작업을 저장하는 데 사용됩니다
  • keepAliveTime은 corePoolSize
  • 보다 큰 스레드에 대한 유휴 시간 초과 시간입니다. 작업이 이스케이프되고 스레드 풀이 닫힐 때 작업 처리에 사용됩니다. 스레드 풀은 현재 스레드 수가 corePoolSize보다 적으면
  • 에서 새

    스레드를 추가합니다. 스레드 수 = corePoolSize 및 corePoolSize인 경우 workQueue가 새 작업을 저장할 수 없는 경우에만 새 스레드가 생성됩니다. 초과 스레드는 유휴 keepAliveTime 후에 삭제됩니다.

구현(JDK1.8 기준)


ThreadPoolExecutor에 저장된 상태는

입니다.
RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED를 포함한 현재 스레드 풀 상태입니다.


현재 실행 중인 스레드의 유효 개수입니다.


이 두 상태를 int 변수에 넣으면 처음 3자리는 스레드 풀 상태, 마지막 29자리는 스레드 개수입니다.


예를 들어 0b11100000000000000000000000000001은 스레드인 RUNNING을 나타냅니다.

HashSet을 통해 작업자 세트를 저장합니다. HashSet에 액세스하기 전에 먼저 보호된 mainLock:ReentrantLock

제출, 실행


execute 실행 방법은 먼저 현재 워커 개수를 확인하는 것인데, corePoolSize보다 작다면 코어 워커를 추가해 보세요. 스레드 풀은 스레드 수 유지 및 상태 확인에 대해 많은 테스트를 수행합니다.

public void execute(Runnable command) {
    int c = ctl.get();
    // 如果当期数量小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
      // 尝试增加worker
      if (addWorker(command, true))
        return;
      c = ctl.get();
    }
    // 如果线程池正在运行并且成功添加到工作队列中
    if (isRunning(c) && workQueue.offer(command)) {
      // 再次检查状态,如果已经关闭则执行拒绝处理
      int recheck = ctl.get();
      if (! isRunning(recheck) && remove(command))
        reject(command);
      // 如果工作线程都down了
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
    }
    else if (!addWorker(command, false))
      reject(command);
  }

addWorker 메소드 구현

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
      int c = ctl.get();
      int rs = runStateOf(c);
      // Check if queue empty only if necessary.
      if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
          firstTask == null &&
          ! workQueue.isEmpty()))
        return false;
      for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
          return false;
        if (compareAndIncrementWorkerCount(c))
          break retry;
        c = ctl.get(); // Re-read ctl
        if (runStateOf(c) != rs)
          continue retry;
        // else CAS failed due to workerCount change; retry inner loop
      }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
      w = new Worker(firstTask);
      final Thread t = w.thread;
      if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          // Recheck while holding lock.
          // Back out on ThreadFactory failure or if
          // shut down before lock acquired.
          int rs = runStateOf(ctl.get());
          if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
              throw new IllegalThreadStateException();
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
              largestPoolSize = s;
            workerAdded = true;
          }
        } finally {
          mainLock.unlock();
        }
        if (workerAdded) {
          // 如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker)
          t.start();
          workerStarted = true;
        }
      }
    } finally {
      if (! workerStarted)
        addWorkerFailed(w);
    }
    return workerStarted;
  }
Worker 클래스는 동기 대기 기능을 얻기 위해 AbstractQueuedSynchronizer를 상속합니다.

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
  {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
    /** Thread this worker is running in. Null if factory fails. */
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
      setState(-1); // inhibit interrupts until runWorker
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker */
    public void run() {
      runWorker(this);
    }
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {
      return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    protected boolean tryRelease(int unused) {
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }
    public void lock()    { acquire(1); }
    public boolean tryLock() { return tryAcquire(1); }
    public void unlock()   { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
      Thread t;
      if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        }
      }
    }

runWorker(Worker)는 Work Queue에서 지속적으로 작업을 가져와서 실행하는 Worker의 폴링 실행 로직입니다. 작업을 실행하는 동안 작업자가 중단되는 것을 방지하려면 각 작업을 실행하기 전에 작업자를 잠가야 합니다.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      while (task != null || (task = getTask()) != null) {
        w.lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted. This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
           runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
          wt.interrupt();
        try {
          beforeExecute(wt, task);
          Throwable thrown = null;
          try {
            task.run();
          } catch (RuntimeException x) {
            thrown = x; throw x;
          } catch (Error x) {
            thrown = x; throw x;
          } catch (Throwable x) {
            thrown = x; throw new Error(x);
          } finally {
            afterExecute(task, thrown);
          }
        } finally {
          task = null;
          w.completedTasks++;
          w.unlock();
        }
      }
      completedAbruptly = false;
    } finally {
      processWorkerExit(w, completedAbruptly);
    }
  }

ThreadPoolExecutor의 submit 메소드에서 Callable은 FutureTask로 패키징된 후 Execute 메소드로 넘겨집니다.

FutureTask


FutureTask는 Runnable과 Future에서 상속합니다.

NEW, 아직 실행되지 않음,

COMPLETING,
NORMAL을 실행하면 정상 실행이 완료되고 결과를 얻습니다
EXCEPTIONAL, 실행
예외가 발생합니다
CANCELLED, 실행이 취소됩니다INTERRUPTING, 실행이 중단됩니다
INTERRUPTED, 중단되었습니다.


키 get 메소드

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }

가 먼저 현재 상태를 가져옵니다. 실행이 완료되지 않고 정상이면 대기 결과 프로세스로 들어갑니다. 현재 상태를 얻기 위해 계속해서

루프를 돌립니다. 결과가 없으면 CAS를 통해 대기 목록의 선두에 자신을 추가합니다. 시간 초과가 설정된 경우 LockSupport.parkNanos는 지정된 시간에 추가됩니다.

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
  }
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
      }
      int s = state;
      if (s > COMPLETING) {
        if (q != null)
          q.thread = null;
        return s;
      }
      else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
      else if (q == null)
        q = new WaitNode();
      else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                           q.next = waiters, q);
      else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
          removeWaiter(q);
          return state;
        }
        LockSupport.parkNanos(this, nanos);
      }
      else
        LockSupport.park(this);
    }
  }
FutureTask의 실행 메소드는 작업을 실행하고 결과의 위치를 ​​설정하는 것입니다. 먼저 현재 상태가 NEW인지 확인하고 현재 스레드를 실행 스레드로 설정한 다음 Callable의 호출을 호출하여 가져옵니다. 결과를 설정하고 결과를 설정하여 FutureTask 상태를 수정합니다. 아아아아

위 내용은 Java의 ThreadPoolExecutor 원리에 대한 자세한 분석(코드 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.