Heim  >  Artikel  >  Java  >  ArrayBlockingQueue für gleichzeitige Paketblockierungswarteschlange

ArrayBlockingQueue für gleichzeitige Paketblockierungswarteschlange

巴扎黑
巴扎黑Original
2017-06-26 09:31:191276Durchsuche

jdk1.7.0_79

Im vorherigen Abschnitt haben wir eine kurze Analyse der Einreihung und Entnahme der nicht blockierenden Warteschlange ConcurrentLinkedQueue im Concurrent-Paket durchgeführt Blockierungswarteschlange im gleichzeitigen Paket.

JavaDie gesamte Blockierungswarteschlange im gleichzeitigen Paket beträgt 7 Natürlich sind sie alle threadsicher.

ArrayBlockingQueue: Eine begrenzte Blockierung, die aus einer Array-Strukturwarteschlange besteht .

LinkedBlockingQueue: Eine begrenzte Blockierungswarteschlange, die aus einer verknüpften Listenstruktur besteht.

PriorityBlockingQueue: Eine unbegrenzte Blockierungswarteschlange, die die Prioritätssortierung unterstützt.

DealyQueue: Eine unbegrenzte Blockierungswarteschlange, die mithilfe einer Prioritätswarteschlange implementiert wird.

SynchronousQueue: Eine blockierende Warteschlange, die keine Elemente speichert.

LinkedTransferQueue: Eine unbegrenzte Blockierungswarteschlange, die aus einer verknüpften Listenstruktur besteht.

LinkedBlockingDeque: Eine bidirektionale Blockierungswarteschlange, die aus einer verknüpften Listenstruktur besteht . (Auszug aus „JavaThe Art of Concurrent Programming“) 🎜> Machen Sie in diesem Artikel eine kurze Analyse der ArrayBlockingQueue Blockierungswarteschlange

Für ArrayLinkedQueue gilt in der Vergangenheit eine Sicherheitsgarantie garantiert durch ReentrantLock, verwandt

ReentrantLock
siehe „
5.Lock

Schnittstelle und ihre ImplementierungReentrantLock“, die ich im Folgenden auch entsprechend erwähnen werde.

  首先来查看其构造函数: 

构造方法 

构造方法 

 

public ArrayBlockingQueue(int capacity) 

构造指定大小的有界队列 

public ArrayBlockingQueue(int capacity, boolean fair) 

构造指定大小的有界队列,指定为公平或非公平锁 

public ArrayBlockingQueue(int capacity, boolean fair, Collection c) 

 

构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合 

 

public ArrayBlockingQueue(int capacity) 

public ArrayBlockingQueue(int capacity, boolean fair) 

Sie können dies auch tun锁 

public ArrayBlockingQueue(int Kapazität, boolean fair, Collection c) 

 

Sie können dies auch tun时加入一个集合 
 1 public ArrayBlockingQueue(int capacity) { 
 2   this(capacity, false);//默认构造非公平锁的阻塞队列  3 } 
 4 public ArrayBlockingQueue(int capacity, boolean fair) { 
 5   if (capacity <= 0)  
 6     throw new IllegalArgumentException(); 
 7   this.items = new Object[capacity]; 
 8   lock = new ReentrantLock(fair);//初始化ReentrantLock重入锁,出队入队拥有这同一个锁  9   notEmpty = lock.newCondition;//初始化非空等待队列,有关Condition可参考《6.类似Object监视器方法的Condition接口》10   notFull = lock.newCondition;//初始化非满等待队列 11 } 
12 public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 
13   this(capacity, fair); 
14   final ReentrantLock lock = this.lock; 
15   lock.lock();//注意在这个地方需要获得锁,这为什么需要获取锁的操作呢? 16   try { 
17     int i = 0; 
18     try { 
19       for (E e : c) { 
20         checkNotNull(e); 
21         item[i++] = e;//将集合添加进数组构成的队列中 22       } 
23     } catch (ArrayIndexOutOfBoundsException ex) { 
24       throw new IllegalArgumentException(); 
25     } 
26     count = i;//队列中的实际数据数量 27     putIndex = (i == capacity) ? 0 : i; 
28   } finally { 
29     lock.unlock(); 
30   } 
31 }

In Zeile 15 gibt es einen Kommentar im Quellcode: Nur für Sichtbarkeit sperren, nicht gegenseitig Ausschluss . Die Bedeutung dieses Satzes besteht darin, anzugeben, dass der Betrieb dieser Sperre nicht zum gegenseitigen Ausschluss dient, sondern dazu dient, ihre Sichtbarkeit sicherzustellen. Thread T1 ist das instanziierte ArrayBlockingQueue-Objekt und T2 ist die Warteschlangenoperation für das instanziierte ArrayBlockingQueue-Objekt. (Natürlich muss die Ausführungsreihenfolge von T1 und T2 sichergestellt werden), wenn es nicht gesperrt ist (das Sperren stellt seine Sichtbarkeit sicher, d. h. das Zurückschreiben in den Hauptspeicher). ), Die Sammlung c von T1 darf nur im Cache vorhanden sein, der vom T1-Thread verwaltet wird, und wird nicht in den Hauptspeicher zurückgeschrieben wird instanziiertc im von 🎜>ArrayBlockingQueue verwalteten Cache. Zu diesem Zeitpunkt kommt es aufgrund der Sichtbarkeit zu Dateninkonsistenzen Thread-Sicherheitsprobleme .

Im Folgenden sind einige Dequeue- und Inqueue-Vorgänge von

ArrayBlockingQueue aufgeführt.

WarteschlangeElement Einfügung von

Wirft eine Ausnahme aus

Rückgabewert (nicht blockierend)

Wert innerhalb eines bestimmten Zeitraums zurückgeben

Rückgabewert (Blockierung)

Einfügen

add(e)//Wenn die Warteschlange nicht voll ist, wird true zurückgegeben, wenn die Warteschlange voll ist, wird eine IllegalStateException(„Warteschlange voll“) ausgelöst ——AbstractQueue

offer(e)//Wenn die Warteschlange nicht voll ist, geben Sie true zurück, wenn die Warteschlange voll ist, geben Sie false zurück. Nicht blockierende Rückgabe sofort.

Angebot(e, Zeit, Einheit)//Stellen Sie die Wartezeit ein Wenn die Daten nicht innerhalb der angegebenen Zeit in die Warteschlange eingefügt werden können, wird false zurückgegeben. Wenn das Einfügen erfolgreich ist, wird true zurückgegeben.

put(e)//Wenn die Warteschlange nicht voll ist, einfügen direkt ohne Rückgabewert; wenn die Warteschlange voll ist, wird blockiert und vor dem Einfügen gewartet, bis die Warteschlange nicht mehr voll ist.

//ArrayBlockingQueue#add public boolean add(E e) { 
  return super.add(e); 
}
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) { 
  if (offer(e))//offer方法由Queue接口定义     return true; 
  else     throw new IllegalStateException(); 
}
//ArrayBlockingQueue#offer,队列未满时返回true,满时返回false public boolean offer(E e) { 
  checkNotNull(e);//检查入队元素是否为空   final ReentrantLock lock = this.lock; 
  lock.lock();//获得锁,线程安全   try { 
    if (count == items.length)//队列满时,不阻塞等待,直接返回false       return false; 
    else { 
      insert(e);//队列未满,直接插入       return true; 
    } 
  } finally {
    lock.unlock();
  }
}
//ArrayBlockingQueue#insert private void insert(E e) { 
  items[putIndex] = x; 
  putIndex = inc(putIndex); 
  ++count; 
  notEmpty.signal();//唤醒非空等待队列中的线程,有关Condition可参考《6.类似Object监视器方法的Condition接口》
 }

  在这里有几个ArrayBlockingQueue成员变量。items即队列的数组引用,putIndex表示等待插入的数组下标位置。当items[putIndex] = x将新元素插入队列中后,调用inc将数组下标向后移动,如果队列满则将putIndex置为0:

//ArrayBlockingQueue#inc private int inc(int i) { 
  return (++i == items.length) ? 0 : i; 
}

  接着解析下put方法,阻塞插入队列,当队列满时不会返回false,也不会抛出异常,而是一直阻塞等待,直到有空位可插入,但它可被中断返回。

//ArrayBlockingQueue#put public void put(E e) throws InterruptedException { 
  checkNotNull(e);//同样检查插入元素是否为空   final ReentrantLock lock = this.lock; 
  lock.lockInterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。   try { 
    while (count == items.length) 
      notFull.await();//当队列满时,使非满等待队列休眠     insert(e);//此时表示队列非满,故插入元素,同时在该方法里唤醒非空等待队列   } finally { 
    lock.unlock(); 
  } 
}

队列元素的删除 

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue 

poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 

poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 

take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 

 

//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, public E remove() { 
  E x = poll();//poll方法由Queue接口定义   if (x != null) 
    return x; 
  else     throw new NoSuchElementException(); 
}
//ArrayBlockingQueue#poll,队列中有元素时返回元素,不为空时返回null public E poll() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return (count == 0) ? null : extract(); 
  } finally { 
    lock.unlock(); 
  } 
}
//ArrayBlockingQueue#extract private E extract() { 
  final Object[] items = this.items; 
  E x = this.<E>cast(items[takeIndex]);//移除队首元素   items[takeIndex] = null;//将队列数组中的第一个元素置为null,便于GC回收   takeIndex = inc(takeIndex); 
  --count; 
  notFull.signal();//唤醒非满等待队列线程   return x; 
}

  对比add和offer方法,理解了上两个方法后remove和poll实际不难理解,同理在理解了put阻塞插入队列后,对比take阻塞删除队列元素同样也很好理解。

//ArrayBlockQueue#take public E take() throws InterruptedException { 
  final ReentrantLock lock = this.lock(); 
  lock.lockInterrupted();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。   try { 
    while (count == 0)//队列元素为空       notEmpty.await();//非空等待队列休眠     return extract();//此时表示队列非空,故删除元素,同时在里唤醒非满等待队列   } finally { 
    lock.unlock(); 
  } 
}

  最后一个方法size。

public int size() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return count; 
  } finally { 
    lock.unlock(); 
  } 
}

  可以看到ArrayBlockingQueue队列的size方法,是直接返回的count变量,它不像ConcurrentLinkedQueueConcurrentLinkedQueue的size则是每次会遍历这个队列,故ArrayBlockingQueue的size方法比ConcurrentLinkedQueue的size方法效率高。而且ConcurrentLinkedQueue的size方法并没有加锁!也就是说很有可能其size并不准确,这在它的注释中说明了ConcurrentLinkedQueue的size并没有多大的用处。

 

 

Das obige ist der detaillierte Inhalt vonArrayBlockingQueue für gleichzeitige Paketblockierungswarteschlange. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn