>  기사  >  웹 프론트엔드  >  Java 스레드 프로그래밍의 블로킹 큐 컨테이너에 대한 심층적인 이해_기본 지식

Java 스레드 프로그래밍의 블로킹 큐 컨테이너에 대한 심층적인 이해_기본 지식

WBOY
WBOY원래의
2016-05-16 15:27:271563검색

1. 차단 대기열이란 무엇인가요?

BlockingQueue는 두 가지 추가 작업을 지원하는 대기열입니다. 이러한 두 가지 추가 작업은 다음과 같습니다. 대기열이 비어 있으면 요소를 가져오는 스레드는 대기열이 비어 있지 않을 때까지 기다립니다. 대기열이 가득 차면 요소를 저장하는 스레드는 대기열을 사용할 수 있을 때까지 기다립니다. 차단 큐는 생산자 및 소비자 시나리오에서 자주 사용됩니다. 생산자는 큐에 요소를 추가하는 스레드이고 소비자는 큐에서 요소를 가져오는 스레드입니다. 차단 대기열은 생산자가 요소를 저장하고 소비자는 컨테이너에서 요소만 가져오는 컨테이너입니다.

차단 대기열은 네 가지 처리 방법을 제공합니다.

2015127142052051.png (522×105)

예외 발생: 차단 대기열이 가득 찬 경우 대기열에 요소를 삽입하면 IllegalStateException("큐 가득 참") 예외가 발생합니다. 큐가 비어 있으면 큐에서 요소를 가져올 때 NoSuchElementException이 발생합니다.
특수 값 반환: 삽입 메서드는 성공 여부를 반환하고, 성공하면 true를 반환합니다. 제거 방법은 큐에서 요소를 꺼내는 것이며, 요소가 없으면 null을 반환합니다
항상 차단: 차단 대기열이 가득 차면 생산자 스레드가 요소를 대기열에 넣으면 대기열은 데이터를 얻거나 인터럽트에 대한 응답으로 종료될 때까지 생산자 스레드를 차단합니다. 대기열이 비어 있고 소비자 스레드가 대기열에서 요소를 가져오려고 시도하면 대기열은 대기열을 사용할 수 있을 때까지 소비자 스레드도 차단합니다.
시간 초과 종료: 차단 큐가 가득 차면 큐는 일정 시간 동안 생산자 스레드를 차단합니다. 일정 시간을 초과하면 생산자 스레드가 종료됩니다.
2. Java의 차단 대기열

JDK7은 7개의 차단 대기열을 제공합니다. 그들은

  1. ArrayBlockingQueue: 배열 구조로 구성된 제한된 차단 큐입니다.
  2. LinkedBlockingQueue: 연결된 목록 구조로 구성된 제한된 차단 큐입니다.
  3. PriorityBlockingQueue: 우선순위 정렬을 지원하는 무제한 차단 큐입니다.
  4. DelayQueue: 우선순위 큐를 사용하여 구현된 무제한 차단 큐입니다.
  5. SynchronousQueue: 요소를 저장하지 않는 차단 큐입니다.
  6. LinkedTransferQueue: 연결된 목록 구조로 구성된 무제한 차단 큐입니다.
  7. LinkedBlockingDeque: 연결된 목록 구조로 구성된 양방향 차단 대기열입니다.

ArrayBlockingQueue는 배열을 사용하여 구현된 제한된 차단 큐입니다. 이 큐는 FIFO(선입선출) 방식으로 요소를 정렬합니다. 기본적으로 방문자는 대기열에 대한 공정한 액세스를 보장받지 못합니다. 소위 공정한 액세스 대기열은 차단된 모든 생산자 스레드 또는 소비자 스레드를 의미하며 대기열이 사용 가능하면 차단된 순서대로 대기열에 액세스할 수 있습니다. 생산자 스레드가 먼저 차단되면 요소를 먼저 대기열에 삽입할 수 있고, 먼저 차단하는 소비자 스레드는 대기열에서 요소를 먼저 가져올 수 있습니다. 일반적으로 공정성을 보장하기 위해 처리량을 줄입니다. 다음 코드를 사용하여 공정한 차단 대기열을 만들 수 있습니다:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

방문자 공정성은 재진입 잠금을 사용하여 달성되며 코드는 다음과 같습니다.

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

LinkedBlockingQueue는 연결된 목록을 사용하여 구현된 제한된 차단 큐입니다. 이 큐의 기본 및 최대 길이는 Integer.MAX_VALUE입니다. 이 대기열은 선입선출 방식으로 요소를 정렬합니다.

PriorityBlockingQueue는 우선순위를 지원하는 무제한 큐입니다. 기본적으로 요소는 자연 순서로 정렬되며 요소의 정렬 규칙도 비교기를 통해 지정할 수 있습니다. 요소는 오름차순으로 정렬됩니다.

DelayQueue는 지연된 요소 획득을 지원하는 무제한 차단 대기열입니다. 대기열은 PriorityQueue를 사용하여 구현됩니다. 대기열의 요소는 Delayed 인터페이스를 구현해야 합니다. 요소를 생성할 때 대기열에서 현재 요소를 가져오는 데 걸리는 시간을 지정할 수 있습니다. 지연이 만료된 경우에만 대기열에서 요소를 가져올 수 있습니다. 다음 애플리케이션 시나리오에서 DelayQueue를 사용할 수 있습니다.

캐시 시스템 설계: DelayQueue를 사용하여 캐시 요소의 유효 기간을 저장하고, 스레드를 사용하여 루프에서 DelayQueue를 쿼리할 수 있습니다. 일단 DelayQueue에서 요소를 얻을 수 있으면 캐시 유효 기간이 만료되었습니다.
예약된 작업. DelayQueue를 사용하여 해당 날짜에 실행될 작업과 실행 시간을 저장합니다. DelayQueue에서 작업을 가져오면 해당 작업이 실행됩니다. 예를 들어 TimerQueue는 DelayQueue를 사용하여 구현됩니다.
대기열에서 지연된 요소의 순서를 지정하려면 CompareTo를 구현해야 합니다. 예를 들어 지연 시간이 가장 긴 항목을 대기열 끝에 넣습니다. 구현 코드는 다음과 같습니다.

public int compareTo(Delayed other) {
      if (other == this) // compare zero ONLY if same object
        return 0;
      if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask x = (ScheduledFutureTask)other;
        long diff = time - x.time;
        if (diff < 0)
          return -1;
        else if (diff > 0)
          return 1;
  else if (sequenceNumber < x.sequenceNumber)
          return -1;
        else
          return 1;
      }
      long d = (getDelay(TimeUnit.NANOSECONDS) -
           other.getDelay(TimeUnit.NANOSECONDS));
      return (d == 0) &#63; 0 : ((d < 0) &#63; -1 : 1);
    }

3. Delayed 인터페이스 구현 방법

ScheduledThreadPoolExecutor에서 ScheduledFutureTask 클래스를 참조할 수 있습니다. 이 클래스는 Delayed 인터페이스를 구현합니다. 첫째: 객체가 생성될 때 이전 객체를 사용할 수 있는 시간을 기록합니다. 코드는 다음과 같습니다.


ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      super(r, result);
      this.time = ns;
      this.period = period;
      this.sequenceNumber = sequencer.getAndIncrement();
}

그런 다음 getDelay를 사용하여 현재 요소를 얼마나 오랫동안 지연시켜야 하는지 쿼리합니다.

public long getDelay(TimeUnit unit) {
      return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

通过构造函数可以看出延迟时间参数ns的单位是纳秒,自己设计的时候最好使用纳秒,因为getDelay时可以指定任意单位,一旦以纳秒作为单位,而延时的时间又精确不到纳秒就麻烦了。使用时请注意当time小于当前时间时,getDelay会返回负数。

4.如何实现延时队列

延时队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。

long delay = first.getDelay(TimeUnit.NANOSECONDS);
          if (delay <= 0)
            return q.poll();
          else if (leader != null)
            available.await();

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法。如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer方法的关键代码如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代码是试图把存放当前元素的s节点作为tail节点。第二行代码是让CPU自旋等待消费者消费元素。因为自旋会消耗CPU,所以自旋一定的次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其他线程。

tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。

对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是Jdk的bug,使用时还是用带有First和Last后缀的方法更清楚。

在初始化LinkedBlockingDeque时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。

5.阻塞队列的实现原理
本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

  首先看一下ArrayBlockingQueue类中的几个成员变量:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
 
private static final long serialVersionUID = -817911632652898426L;
 
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
 
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
 
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}

   可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。

  lock是一个可重入锁,notEmpty和notFull是等待条件。

  下面看一下ArrayBlockingQueue的构造器,构造器有三个重载版本:

public ArrayBlockingQueue(int capacity) {
}
public ArrayBlockingQueue(int capacity, boolean fair) {
 
}
public ArrayBlockingQueue(int capacity, boolean fair,
             Collection<&#63; extends E> c) {
}

   第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。

  然后看它的两个关键方法的实现:put()和take():

public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  final E[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    try {
      while (count == items.length)
        notFull.await();
    } catch (InterruptedException ie) {
      notFull.signal(); // propagate to non-interrupted thread
      throw ie;
    }
    insert(e);
  } finally {
    lock.unlock();
  }
}

   从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。

  当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。

  我们看一下insert方法的实现:

private void insert(E x) {
  items[putIndex] = x;
  putIndex = inc(putIndex);
  ++count;
  notEmpty.signal();
}

   它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。

  下面是take()方法的实现:

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    try {
      while (count == 0)
        notEmpty.await();
    } catch (InterruptedException ie) {
      notEmpty.signal(); // propagate to non-interrupted thread
      throw ie;
    }
    E x = extract();
    return x;
  } finally {
    lock.unlock();
  }
}


   跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:


private E extract() {
  final E[] items = this.items;
  E x = items[takeIndex];
  items[takeIndex] = null;
  takeIndex = inc(takeIndex);
  --count;
  notFull.signal();
  return x;
}

   跟insert方法也很类似。

  其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.