Maison  >  Article  >  Java  >  Explication détaillée de la collection de concurrence Java

Explication détaillée de la collection de concurrence Java

零下一度
零下一度original
2017-06-17 11:35:551514parcourir

Cet article présente principalement la collection de concurrence Java ConcurrentLinkedQueue. Les amis qui en ont besoin peuvent se référer à

Introduction à ConcurrentLinkedQueue<.>

ConcurrentLinkedQueue est une file d'attente thread-safe, qui convient aux scénarios de « haute concurrence ».

Il s'agit d'une file d'attente thread-safe illimitée basée sur des nœuds de lien, triant les éléments selon le principe FIFO (premier entré, premier sorti). Les éléments nuls ne peuvent pas être placés dans des éléments de file d'attente (sauf pour les nœuds spéciaux implémentés en interne).


Principe de ConcurrentLinkedQueue et structure des données

La structure des données de ConcurrentLinkedQueue est comme indiqué dans la figure ci-dessous :

Explication :

1. ConcurrentLinkedQueue hérite de AbstractQueue.

2. ConcurrentLinkedQueue est implémenté en interne via une liste chaînée. Il contient à la fois le nœud principal head et le nœud tail tail de la liste chaînée. ConcurrentLinkedQueue trie les éléments selon le principe FIFO (premier entré, premier sorti). Les éléments sont insérés dans la liste chaînée depuis la queue et renvoyés depuis la tête.

3. Le type de next dans le nœud de liste chaînée de ConcurrentLinkedQueue est volatile, et le type d'élément de données de la liste chaînée est également volatil. Concernant volatile, nous savons que sa sémantique inclut : "C'est-à-dire que la lecture d'une variable volatile peut toujours voir (n'importe quel thread) la dernière écriture dans cette variable volatile." ConcurrentLinkedQueue utilise volatile pour obtenir un accès mutuellement exclusif aux ressources concurrentes par plusieurs threads.


Liste des fonctions ConcurrentLinkedQueue


// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)
// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
Ce qui suit est créé à partir de ConcurrentLinkedQueue, ajoutez , Supprimez ces aspects et analysez-le.

1 Créer

Ce qui suit est expliqué avec ConcurrentLinkedQueue().


public ConcurrentLinkedQueue() {
 head = tail = new Node<E>(null);
}
Explication : Dans le

constructeur, un nouveau "nœud avec un contenu nul" est créé, et la tête et le pied de page sont définis La valeur de la queue est le nouveau nœud.

Les définitions de la tête et de la queue sont les suivantes :


private transient volatile Node<E> head;
private transient volatile Node<E> tail;
La tête et la queue sont toutes deux de types volatils, et elles ont la signification donnée par volatile : "Autrement dit, lors d'une lecture d'une variable volatile, vous pouvez toujours voir (n'importe quel thread) la dernière écriture dans cette variable volatile." La déclaration de

Node est la suivante :


private static class Node<E> {
 volatile E item;
 volatile Node<E> next;
 Node(E item) {
 UNSAFE.putObject(this, itemOffset, item);
 }
 boolean casItem(E cmp, E val) {
 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 }
 void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
 }
 boolean casNext(Node<E> cmp, Node<E> val) {
 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 }
 // Unsafe mechanics
 private static final sun.misc.Unsafe UNSAFE;
 private static final long itemOffset;
 private static final long nextOffset;
 static {
 try {
  UNSAFE = sun.misc.Unsafe.getUnsafe();
  Class k = Node.class;
  itemOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("item"));
  nextOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("next"));
 } catch (Exception e) {
  throw new Error(e);
 }
 }
}
Explication :

Node est un nœud de liste chaînée unidirectionnelle, next est utilisé pour pointer vers le nœud suivant et l'élément est utilisé pour stocker des données. Les API pour exploiter les données du nœud dans Node sont toutes implémentées via la fonction CAS du mécanisme Unsafe ; par exemple, casNext() « compare et définit le nœud suivant du nœud » via la fonction CAS.


2. Ajouter

Ce qui suit utilise add(E e) comme exemple pour expliquer l'ajout dans ConcurrentLinkedQueue.


public boolean add(E e) {
 return offer(e);
}
Explication : add() appelle en fait offer() pour terminer l'opération d'ajout.

Le code source de offer() est le suivant :


public boolean offer(E e) {
 // 检查e是不是null,是的话抛出NullPointerException异常。
 checkNotNull(e);
 // 创建新的节点
 final Node<E> newNode = new Node<E>(e);

 // 将“新的节点”添加到链表的末尾。
 for (Node<E> t = tail, p = t;;) {
 Node<E> q = p.next;
 // 情况1:q为空
 if (q == null) {
  // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
  // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
  // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
  if (p.casNext(null, newNode)) {
  if (p != t) // hop two nodes at a time
   casTail(t, newNode); // Failure is OK.
  return true;
  }
 }
 // 情况2:p和q相等
 else if (p == q)
  p = (t != (t = tail)) ? t : head;
 // 情况3:其它
 else
  p = (p != t && t != (t = tail)) ? t : q;
 }
}
Explication : La fonction de offer(E e) est d'ajouter l'élément e à la fin de la liste chaînée. La comparaison de offer() consiste à comprendre la

boucle for Les analyses suivantes pour en distinguant 3 situations.

Cas 1 -- q est vide. Cela signifie que q est le nœud à côté du nœud de queue. À ce stade, définissez "le nœud suivant de p sur newNode" via p.casNext (null, newNode). Si le réglage réussit, comparez "p et t" (si p n'est pas égal à t, définissez newNode sur le nouveau nœud de queue. ), puis renvoie vrai. Sinon (c'est-à-dire "d'autres threads ont modifié le nœud de queue"), ne faites rien et continuez la boucle for.

p.casNext(null, newNode) appelle CAS pour opérer sur p. Si « le prochain nœud de p est égal à null », alors définissez « le prochain nœud de p est égal à newNode » ; si le paramètre est réussi, il retournera vrai, s'il échoue, il retournera faux.

Cas 2 -- p et q sont égaux. Quand est-ce que cela arrivera ? Grâce au « Cas 3 », on sait qu'après le traitement du « Cas 3 », la valeur de p peut être égale à q.

À ce stade, si le nœud de queue n'a pas changé, alors le nœud de tête aurait dû changer, puis définir p comme nœud de tête, puis parcourir à nouveau la liste chaînée sinon (si le nœud de queue change) ; , définissez p comme nœud de queue.

Cas 3 -- Autres.

On convertit p = (p != t && t != (t = tail)) t : q;


if (p==t) {
 p = q;
} else {
 Node<E> tmp=t;
 t = tail;
 if (tmp==t) {
 p=q;
 } else {
 p=t;
 }
}
Si p et t sont égaux, définissez p sur q. Sinon, déterminez « si le nœud de queue a changé ». S'il n'y a pas de changement, définissez p sur q ; sinon, définissez p sur le nœud de queue. Le code source de

checkNotNull() est le suivant :



private static void checkNotNull(Object v) {
 if (v == null)
 throw new NullPointerException();
}
3. poll() à titre d'exemple, la suppression dans ConcurrentLinkedQueue est expliquée.

public E poll() {
 // 设置“标记”
 restartFromHead:
 for (;;) {
 for (Node<E> h = head, p = h, q;;) {
  E item = p.item;

  // 情况1
  // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
  // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
  if (item != null && p.casItem(item, null)) {
  if (p != h) // hop two nodes at a time
   updateHead(h, ((q = p.next) != null) ? q : p);
  return item;
  }
  // 情况2
  // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
  else if ((q = p.next) == null) {
  updateHead(h, p);
  return null;
  }
  // 情况3
  // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
  else if (p == q)
  continue restartFromHead;
  // 情况4
  // 设置p为q
  else
  p = q;
 }
 }
}

说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。

p.casItem(item, null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。

在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。

updateHead()的源码如下:


final void updateHead(Node<E> h, Node<E> p) {
 if (h != p && casHead(h, p))
 h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。

casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。

lazySetNext()的源码如下:


void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。

则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q

在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。

设置p=q。

ConcurrentLinkedQueue示例


import java.util.*;
 import java.util.concurrent.*;
 /*
 * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 *
 * 下面是“多个线程同时操作并且遍历queue”的示例
 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
 *
 *
 */
 public class ConcurrentLinkedQueueDemo1 {
 // TODO: queue是LinkedList对象时,程序会出错。
 //private static Queue<String> queue = new LinkedList<String>();
 private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
 public static void main(String[] args) {
  // 同时启动两个线程对queue进行操作!
  new MyThread("ta").start();
  new MyThread("tb").start();
 }
 private static void printAll() {
  String value;
  Iterator iter = queue.iterator();
  while(iter.hasNext()) {
  value = (String)iter.next();
  System.out.print(value+", ");
  }
  System.out.println();
 }
 private static class MyThread extends Thread {
  MyThread(String name) {
  super(name);
  }
  @Override
  public void run() {
   int i = 0;
  while (i++ < 6) {
   // “线程名” + "-" + "序号"
   String val = Thread.currentThread().getName()+i;
   queue.add(val);
   // 通过“Iterator”遍历queue。
   printAll();
  }
  }
 }
 }

(某一次)运行结果:


ta1, ta1, tb1, tb1,
ta1, ta1, tb1, tb1, ta2, ta2, tb2, 
tb2, 
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, 
ta3, ta1, tb3, tb1, ta4, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, 
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, 
tb4, ta5, tb5, ta6, tb6,

结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn