Maison >Java >javaDidacticiel >ArrayBlockingQueue pour la file d'attente de blocage de paquets simultané

ArrayBlockingQueue pour la file d'attente de blocage de paquets simultané

巴扎黑
巴扎黑original
2017-06-26 09:31:191284parcourir

jdk1.7.0_79

Dans la section précédente, nous avons fait une brève analyse de la mise en file d'attente et du retrait de la file d'attente non bloquante ConcurrentLinkedQueue dans le package concurrent. file d'attente bloquante dans le package concurrent. Une brève analyse.

JavaLa file d'attente de blocage totale dans le package simultané est 7 Bien sûr, ils sont tous thread-safe.

ArrayBlockingQueue : Un blocage limité constitué d'une file d'attente de structure de tableau .

LinkedBlockingQueue : une file d'attente de blocage limitée composée d'une structure de liste chaînée.

PriorityBlockingQueue : une file d'attente de blocage illimitée qui prend en charge le tri prioritaire.

DealyQueue : une file d'attente de blocage illimitée implémentée à l'aide d'une file d'attente prioritaire.

SynchronousQueue : Une file d'attente de blocage qui ne stocke pas d'éléments.

LinkedTransferQueue : une file d'attente de blocage illimitée composée d'une structure de liste chaînée.

LinkedBlockingDeque : une file d'attente de blocage bidirectionnelle composée d'une structure de liste chaînée . (Extrait de "JavaThe Art of Concurrent Programming") 🎜> Dans cet article faites une brève analyse de ArrayBlockingQueue file d'attente de blocage

Pour ArrayLinkedQueue, en regardant le passé, sa garantie de sécurité est garanti par ReentrantLock, associé

ReentrantLock
, veuillez vous référer à "
5.Lock

Interface et sa mise en œuvreReentrantLock", que je mentionnerai également de manière appropriée ci-dessous.

  首先来查看其构造函数: 

构造方法 

构造方法 

 

public ArrayBlockingQueue(int capacity) 

构造指定大小的有界队列 

public ArrayBlockingQueue(int capacity, boolean fair) 

构造指定大小的有界队列,指定为公平或非公平锁 

public ArrayBlockingQueue(int capacity, boolean fair, Collection c) 

 

构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合 

 

public ArrayBlockingQueue(int capacité)  

构造指定大小的有界队列 

public ArrayBlockingQueue(int capacité, booléen juste) 

构造指定大小的有界队列,指定为公平或非公 

public ArrayBlockingQueue(int capacité, booléen juste, Collection c) 

 

时加入一个集合 
 1 public ArrayBlockingQueue(int capacity) { 
 2   this(capacity, false);//默认构造非公平锁的阻塞队列  3 } 
 4 public ArrayBlockingQueue(int capacity, boolean fair) { 
 5   if (capacity <= 0)  
 6     throw new IllegalArgumentException(); 
 7   this.items = new Object[capacity]; 
 8   lock = new ReentrantLock(fair);//初始化ReentrantLock重入锁,出队入队拥有这同一个锁  9   notEmpty = lock.newCondition;//初始化非空等待队列,有关Condition可参考《6.类似Object监视器方法的Condition接口》10   notFull = lock.newCondition;//初始化非满等待队列 11 } 
12 public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 
13   this(capacity, fair); 
14   final ReentrantLock lock = this.lock; 
15   lock.lock();//注意在这个地方需要获得锁,这为什么需要获取锁的操作呢? 16   try { 
17     int i = 0; 
18     try { 
19       for (E e : c) { 
20         checkNotNull(e); 
21         item[i++] = e;//将集合添加进数组构成的队列中 22       } 
23     } catch (ArrayIndexOutOfBoundsException ex) { 
24       throw new IllegalArgumentException(); 
25     } 
26     count = i;//队列中的实际数据数量 27     putIndex = (i == capacity) ? 0 : i; 
28   } finally { 
29     lock.unlock(); 
30   } 
31 }

À la ligne 15, il y a un commentaire dans le code source : Verrouiller uniquement pour la visibilité, pas mutuel exclusion . Le sens de cette phrase est de préciser que le fonctionnement de cette écluse n'est pas destiné à des opérations d'exclusion mutuelle, mais d'en assurer la visibilité. Le thread T1 est l'objet ArrayBlockingQueue instancié, et T2 est l'opération de mise en file d'attente pour l'objet ArrayBlockingQueue instancié. (Bien entendu, l'ordre d'exécution de T1 et T2 doit être assuré), s'il n'est pas verrouillé (le verrouillage assurera sa visibilité, c'est-à-dire sa réécriture dans la mémoire principale ), La collection c de T1 ne peut exister que dans le cache maintenu par le thread T1 et n'est pas réécrite dans la mémoire principale T2. est instanciéc dans le cache maintenu par 🎜>ArrayBlockingQueue et dans la mémoire principale À l'heure actuelle, une incohérence des données est due à la visibilité, provoquant. problèmes de sécurité des fils .

Voici quelques opérations de sortie et de mise en file d'attente de ArrayBlockingQueue.

File d'attenteÉlément Insertion de

Lance une exception

Valeur de retour (non bloquante)

Valeur de retour dans un certain délai

Valeur de retour (blocage)

Insérer

add(e)//Lorsque la file d'attente n'est pas pleine, renvoie vrai ; lorsque la file d'attente est pleine, une exception IllegalStateException("Queue full") est levée ——AbstractQueue

offer(e)//Lorsque la file d'attente n'est pas pleine, renvoie true lorsque la file d'attente est pleine, renvoie false ; Retours non bloquants immédiatement.

offre (e, heure, unité)//Définir le temps d'attente . Si les données ne peuvent pas être insérées dans la file d'attente dans le délai spécifié, elles renvoient false. Si l'insertion réussit, elles renvoient true.

put(e)//Lorsque la file d'attente n'est pas pleine, insérez directement sans valeur de retour ; lorsque la file d'attente est pleine, elle se bloquera et attendra que la file d'attente ne soit pas pleine avant de l'insérer.

//ArrayBlockingQueue#add public boolean add(E e) { 
  return super.add(e); 
}
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) { 
  if (offer(e))//offer方法由Queue接口定义     return true; 
  else     throw new IllegalStateException(); 
}
//ArrayBlockingQueue#offer,队列未满时返回true,满时返回false public boolean offer(E e) { 
  checkNotNull(e);//检查入队元素是否为空   final ReentrantLock lock = this.lock; 
  lock.lock();//获得锁,线程安全   try { 
    if (count == items.length)//队列满时,不阻塞等待,直接返回false       return false; 
    else { 
      insert(e);//队列未满,直接插入       return true; 
    } 
  } finally {
    lock.unlock();
  }
}
//ArrayBlockingQueue#insert private void insert(E e) { 
  items[putIndex] = x; 
  putIndex = inc(putIndex); 
  ++count; 
  notEmpty.signal();//唤醒非空等待队列中的线程,有关Condition可参考《6.类似Object监视器方法的Condition接口》
 }

  在这里有几个ArrayBlockingQueue成员变量。items即队列的数组引用,putIndex表示等待插入的数组下标位置。当items[putIndex] = x将新元素插入队列中后,调用inc将数组下标向后移动,如果队列满则将putIndex置为0:

//ArrayBlockingQueue#inc private int inc(int i) { 
  return (++i == items.length) ? 0 : i; 
}

  接着解析下put方法,阻塞插入队列,当队列满时不会返回false,也不会抛出异常,而是一直阻塞等待,直到有空位可插入,但它可被中断返回。

//ArrayBlockingQueue#put public void put(E e) throws InterruptedException { 
  checkNotNull(e);//同样检查插入元素是否为空   final ReentrantLock lock = this.lock; 
  lock.lockInterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。   try { 
    while (count == items.length) 
      notFull.await();//当队列满时,使非满等待队列休眠     insert(e);//此时表示队列非满,故插入元素,同时在该方法里唤醒非空等待队列   } finally { 
    lock.unlock(); 
  } 
}

队列元素的删除 

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue 

poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 

poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 

take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 

 

//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, public E remove() { 
  E x = poll();//poll方法由Queue接口定义   if (x != null) 
    return x; 
  else     throw new NoSuchElementException(); 
}
//ArrayBlockingQueue#poll,队列中有元素时返回元素,不为空时返回null public E poll() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return (count == 0) ? null : extract(); 
  } finally { 
    lock.unlock(); 
  } 
}
//ArrayBlockingQueue#extract private E extract() { 
  final Object[] items = this.items; 
  E x = this.<E>cast(items[takeIndex]);//移除队首元素   items[takeIndex] = null;//将队列数组中的第一个元素置为null,便于GC回收   takeIndex = inc(takeIndex); 
  --count; 
  notFull.signal();//唤醒非满等待队列线程   return x; 
}

  对比add和offer方法,理解了上两个方法后remove和poll实际不难理解,同理在理解了put阻塞插入队列后,对比take阻塞删除队列元素同样也很好理解。

//ArrayBlockQueue#take public E take() throws InterruptedException { 
  final ReentrantLock lock = this.lock(); 
  lock.lockInterrupted();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。   try { 
    while (count == 0)//队列元素为空       notEmpty.await();//非空等待队列休眠     return extract();//此时表示队列非空,故删除元素,同时在里唤醒非满等待队列   } finally { 
    lock.unlock(); 
  } 
}

  最后一个方法size。

public int size() { 
  final ReentrantLock lock = this.lock; 
  lock.lock(); 
  try { 
    return count; 
  } finally { 
    lock.unlock(); 
  } 
}

  可以看到ArrayBlockingQueue队列的size方法,是直接返回的count变量,它不像ConcurrentLinkedQueueConcurrentLinkedQueue的size则是每次会遍历这个队列,故ArrayBlockingQueue的size方法比ConcurrentLinkedQueue的size方法效率高。而且ConcurrentLinkedQueue的size方法并没有加锁!也就是说很有可能其size并不准确,这在它的注释中说明了ConcurrentLinkedQueue的size并没有多大的用处。

 

 

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn