>  기사  >  Java  >  동시 패킷 차단 대기열을 위한 ArrayBlockingQueue

동시 패킷 차단 대기열을 위한 ArrayBlockingQueue

巴扎黑
巴扎黑원래의
2017-06-26 09:31:191273검색

jdk1.7.0_79

이전 섹션에서 동시 패키지의 비차단 대기열 ConcurrentLinkedQueue의 대기열 추가 및 대기열 해제 간략한 분석이 이루어졌습니다. 이 문서에서는 동시 패키지의 차단 대기열을 간략하게 분석합니다.

  Java 동시성 패키지에는 총 7 차단 대기열이 있으며 물론 모두 스레드로부터 안전합니다.

 ArrayBlockingQueue: 배열 구조로 구성된 제한된 차단 큐입니다.

 LinkedBlockingQueue: 연결된 목록 구조로 구성된 제한된 차단 큐입니다.

 PriorityBlockingQueue: 우선순위 정렬을 지원하는 무제한 차단 큐입니다.

  DealyQueue: 우선순위 큐를 사용하여 구현된 무제한 차단 큐입니다.

 SynchronousQueue: 요소를 저장하지 않는 차단 큐입니다.

 LinkedTransferQueue: 연결된 목록 구조로 구성된 무한한 차단 큐입니다.

 LinkedBlockingDeque: 연결된 목록 구조로 구성된 양방향 차단 대기열입니다. ("JavaThe Art of Concurrent 프로그래밍"에서 발췌)

이 기사에서는ArrayBlockingQueueblocking queue에 대해 간략하게 분석합니다.

ArrayLinkedQueue의 경우 과거를 보면 ReentrantLock에 의해 보안이 보장됩니다. ReentrantLock에 대한 분석은 "5.Lock인터페이스 및 구현ReentrantLock을 참조하세요. ", 아래에서도 적절하게 언급하겠습니다.

 먼저 생성자를 살펴보겠습니다.

Construction method

public ArrayBlockingQueue(intcapacity)

지정된 크기의 제한된 대기열을 구성합니다

public ArrayBlockingQueue(intcapacity, booleanfair)

다음의 제한된 대기열을 구성합니다. 지정된 크기, 공정 또는 불공정 잠금으로 지정

public ArrayBlockingQueue(intcapacity, booleanfair , 컬렉션지정된 크기의 제한된 대기열을 구성하고, 공정 또는 불공정 잠금으로 지정하고, 추가하도록 지정합니다. 컬렉션

 1 public ArrayBlockingQueue(int capacity) { 
 2   this(capacity, false);//默认构造非公平锁的阻塞队列  3 } 
 4 public ArrayBlockingQueue(int capacity, boolean fair) { 
 5   if (capacity <= 0)  
 6     throw new IllegalArgumentException(); 
 7   this.items = new Object[capacity]; 
 8   lock = new ReentrantLock(fair);//初始化ReentrantLock重入锁,出队入队拥有这同一个锁  9   notEmpty = lock.newCondition;//初始化非空等待队列,有关Condition可参考《6.类似Object监视器方法的Condition接口》10   notFull = lock.newCondition;//初始化非满等待队列 11 } 
12 public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 
13   this(capacity, fair); 
14   final ReentrantLock lock = this.lock; 
15   lock.lock();//注意在这个地方需要获得锁,这为什么需要获取锁的操作呢? 16   try { 
17     int i = 0; 
18     try { 
19       for (E e : c) { 
20         checkNotNull(e); 
21         item[i++] = e;//将集合添加进数组构成的队列中 22       } 
23     } catch (ArrayIndexOutOfBoundsException ex) { 
24       throw new IllegalArgumentException(); 
25     } 
26     count = i;//队列中的实际数据数量 27     putIndex = (i == capacity) ? 0 : i; 
28   } finally { 
29     lock.unlock(); 
30   } 
31 }</span></span></span></span></span></span></span></span></span></span></p></div>
<div class="OutlineElement Ltr SCX196234819"><p class="Paragraph SCX196234819"><span class="TextRun SCX196234819"> 15번째 줄<span class="TextRun SCX196234819"><span class="TextRun SCX196234819">에 소스 코드에 다음과 같은 주석이 있습니다. <span class="TextRun SCX196234819"> 상호 <span class="TextRun SCX196234819">배제<span class="TextRun SCX196234819">가 아닌 가시성을 위해서만 잠금. 이 문장의 의미는 이 잠금 장치의 작동이 상호 배제 작업을 위한 것이 아니라 가시성을 보장하기 위한 것임을 제공하는 것입니다. 스레드 <span class="TextRun SCX196234819">T1<span class="TextRun SCX196234819">는 인스턴스화된 <span class="TextRun SCX196234819"><span class="SpellingError SCX196234819">ArrayBlockingQueue<span class="TextRun SCX196234819"> 개체이고, <span class="TextRun SCX196234819">T2<span class="TextRun SCX196234819">는 인스턴스화된 <span class="TextRun SCX196234819"><span class="SpellingError SCX196234819">ArrayBlockingQueue<span class="TextRun SCX196234819"> 개체에 대한 대기열에 추가 작업입니다(물론 <span class="TextRun SCX196234819">T1<span class="TextRun SCX196234819"> 및 <span class="TextRun SCX196234819">T2<span class="TextRun SCX196234819">의 실행 순서가 보장되어야 함). 실행되지 않음 잠금 작업(잠금은 가시성을 보장합니다. 즉, 주 메모리에 다시 기록됨), <span class="TextRun SCX196234819">T1<span class="TextRun SCX196234819">의 <span class="TextRun SCX196234819">c<span class="TextRun SCX196234819"> 컬렉션은 <span class="TextRun SCX196234819">T1<span class="TextRun SCX196234819"> 스레드에 의해 유지 관리되는 캐시에만 존재할 수 있으며 다시 기록되지 않습니다. <span class="TextRun SCX196234819">T2 <span class="TextRun SCX196234819">에서 인스턴스화된 <span class="TextRun SCX196234819"><span class="SpellingError SCX196234819">ArrayBlockingQueue<span class="TextRun SCX196234819">에 의해 유지되는 캐시와 메인 메모리에는 컬렉션 <span class="TextRun SCX196234819">c<span class="TextRun SCX196234819">이 없습니다. 이때 가시성으로 인해 데이터 불일치가 발생하여 스레드 안전 문제가 발생합니다<span class="TextRun SCX196234819">. ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ <span class="EOP SCX196234819"></span></span></span></span></span></span></span></span></span></span> 다음은 </span>의 출력 중 일부입니다. </span>ArrayBlockingQueue</span> 팀 입장 작업. </span></span></span></span></span></span></span></span></span></span></span></span></span>큐 </span></span> 요소 삽입 </span></span></span></span></span></span></span></p></div>
<div class="OutlineElement Ltr SCX196234819">
<p class="Paragraph SCX196234819"><span class="TextRun SCX196234819"><span class="TextRun SCX196234819"> <span class="SpellingError SCX196234819"> <span class="TextRun SCX196234819"></span></span></span></span></p>
<h2 class="Paragraph SCX196234819"><strong><span class="TextRun SCX196234819"> <span class="TextRun SCX196234819"><span class="SpellingError SCX196234819"><span class="TextRun SCX196234819"><span class="TextRun SCX37794459"><span class="NormalTextRun SCX37794459"><span class="TextRun SCX37794459"><span class="NormalTextRun SCX37794459"><span class="TextRun SCX37794459"> <span class="NormalTextRun SCX37794459"></span></span>예외 발생</span></span></span></span></span></span></span></span></strong></h2>
<table class="Table TableWordWrap SCX104568718" border="1"><tbody class="SCX104568718">반환 값(비차단)<tr class="TableRow SCX104568718">
<td class="SCX104568718">
<div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p></div></div> 특정 시간 반환 값</td>
<td class="SCX104568718">
<div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span> </span> </p></div></div>반환 값(차단)</td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718">ㅋㅋㅋ 대기열 full" )Exception ——</span></p></div></div></td>
</tr>AbstractQueue<tr class="TableRow SCX104568718">
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p></div></div></td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718">offer(e)//큐가 가득 차지 않으면 true를 반환하고, 큐가 가득 차면 false를 반환합니다. 비차단은 즉시 반환됩니다. <p class="Paragraph SCX104568718"> <span class="TextRun SCX104568718"><span class="TextRun SCX104568718"><span class="TextRun SCX104568718"><span class="TextRun SCX104568718"><span class="SpellingError SCX104568718"><span class="EOP SCX104568718"></span></span></span></span></span>offer(e, time, unit)//지정된 시간 내에 데이터를 큐에 삽입할 수 없으면 false를 반환합니다. 성공하면 true가 반환됩니다. </span> </p>
</div></div></td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span>put(e)//큐가 가득 차 있지 않으면 직접 삽입에 대한 반환 값이 없습니다. 큐가 가득 차면 차단되고 대기합니다. 다시 삽입하기 전에 꽉 찼습니다. </span></p></div></div></td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><div class="cnblogs_code"><pre class="brush:php;toolbar:false">//ArrayBlockingQueue#add public boolean add(E e) { 
  return super.add(e); 
}
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) { 
  if (offer(e))//offer方法由Queue接口定义     return true; 
  else     throw new IllegalStateException(); 
}
//ArrayBlockingQueue#offer,队列未满时返回true,满时返回false public boolean offer(E e) { 
  checkNotNull(e);//检查入队元素是否为空   final ReentrantLock lock = this.lock; 
  lock.lock();//获得锁,线程安全   try { 
    if (count == items.length)//队列满时,不阻塞等待,直接返回false       return false; 
    else { 
      insert(e);//队列未满,直接插入       return true; 
    } 
  } finally {
    lock.unlock();
  }
}
//ArrayBlockingQueue#insert private void insert(E e) { 
  items[putIndex] = x; 
  putIndex = inc(putIndex); 
  ++count; 
  notEmpty.signal();//唤醒非空等待队列中的线程,有关Condition可参考《6.类似Object监视器方法的Condition接口》
 }

  在这里有几个ArrayBlockingQueue成员变量。items即队列的数组引用,putIndex表示等待插入的数组下标位置。当items[putIndex] = x将新元素插入队列中后,调用inc将数组下标向后移动,如果队列满则将putIndex置为0:

//ArrayBlockingQueue#inc private int inc(int i) { 
  return (++i == items.length) ? 0 : i; 
}

  接着解析下put方法,阻塞插入队列,当队列满时不会返回false,也不会抛出异常,而是一直阻塞等待,直到有空位可插入,但它可被中断返回。

//ArrayBlockingQueue#put public void put(E e) throws InterruptedException { 
  checkNotNull(e);//同样检查插入元素是否为空   final ReentrantLock lock = this.lock; 
  lock.lockInterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。   try { 
    while (count == items.length) 
      notFull.await();//当队列满时,使非满等待队列休眠     insert(e);//此时表示队列非满,故插入元素,同时在该方法里唤醒非空等待队列   } finally { 
    lock.unlock(); 
  } 
}

队列元素的删除 

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue 

poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 

poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 

take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 

 

//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, public E remove() { 
  E x = poll();//poll方法由Queue接口定义   if (x != null) 
    return x; 
  else     throw new NoSuchElementException(); 
}
//ArrayBlockingQueue#poll,队列中有元素时返回元素,不为空时返回null public E poll() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return (count == 0) ? null : extract(); 
  } finally { 
    lock.unlock(); 
  } 
}
//ArrayBlockingQueue#extract private E extract() { 
  final Object[] items = this.items; 
  E x = this.<E>cast(items[takeIndex]);//移除队首元素   items[takeIndex] = null;//将队列数组中的第一个元素置为null,便于GC回收   takeIndex = inc(takeIndex); 
  --count; 
  notFull.signal();//唤醒非满等待队列线程   return x; 
}

  对比add和offer方法,理解了上两个方法后remove和poll实际不难理解,同理在理解了put阻塞插入队列后,对比take阻塞删除队列元素同样也很好理解。

//ArrayBlockQueue#take public E take() throws InterruptedException { 
  final ReentrantLock lock = this.lock(); 
  lock.lockInterrupted();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。   try { 
    while (count == 0)//队列元素为空       notEmpty.await();//非空等待队列休眠     return extract();//此时表示队列非空,故删除元素,同时在里唤醒非满等待队列   } finally { 
    lock.unlock(); 
  } 
}

  最后一个方法size。

public int size() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return count; 
  } finally { 
    lock.unlock(); 
  } 
}

  可以看到ArrayBlockingQueue队列的size方法,是直接返回的count变量,它不像ConcurrentLinkedQueueConcurrentLinkedQueue的size则是每次会遍历这个队列,故ArrayBlockingQueue的size方法比ConcurrentLinkedQueue的size方法效率高。而且ConcurrentLinkedQueue的size方法并没有加锁!也就是说很有可能其size并不准确,这在它的注释中说明了ConcurrentLinkedQueue的size并没有多大的用处。

 

 

위 내용은 동시 패킷 차단 대기열을 위한 ArrayBlockingQueue의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
이전 기사:Spring Cloud 클라이언트 로드 밸런싱 리본다음 기사:Spring Cloud 클라이언트 로드 밸런싱 리본

관련 기사

더보기