Heim  >  Artikel  >  Java  >  Detaillierte Erläuterung der Java-Parallelitätssammlung

Detaillierte Erläuterung der Java-Parallelitätssammlung

零下一度
零下一度Original
2017-06-17 11:35:551505Durchsuche

In diesem Artikel wird hauptsächlich die Java-Parallelitätssammlung ConcurrentLinkedQueue vorgestellt. Freunde, die sie benötigen, können sich auf

Einführung in ConcurrentLinkedQueue

ConcurrentLinkedQueue ist eine threadsichere Warteschlange, die für Szenarien mit „hoher Parallelität“ geeignet ist.

Es handelt sich um eine unbegrenzte Thread-sichere Warteschlange, die auf Verbindungsknoten basiert und Elemente nach dem FIFO-Prinzip (First In, First Out) sortiert. Nullelemente können nicht in Warteschlangenelementen platziert werden (außer bei intern implementierten Spezialknoten).


ConcurrentLinkedQueue-Prinzip und Datenstruktur

Die Datenstruktur von ConcurrentLinkedQueue ist wie in der folgenden Abbildung dargestellt:

Erklärung:

1. ConcurrentLinkedQueue erbt von AbstractQueue.

2. ConcurrentLinkedQueue wird intern über eine verknüpfte Liste implementiert. Es enthält sowohl den Kopfknotenkopf als auch den Endknotenschwanz der verknüpften Liste. ConcurrentLinkedQueue sortiert Elemente nach dem FIFO-Prinzip (First In, First Out). Elemente werden vom Ende in die verknüpfte Liste eingefügt und vom Kopf zurückgegeben.

3. Der Typ des nächsten Knotens in der verknüpften Liste von ConcurrentLinkedQueue ist flüchtig, und der Typ des Datenelements der verknüpften Liste ist ebenfalls flüchtig. In Bezug auf flüchtig wissen wir, dass seine Semantik Folgendes umfasst: „Das heißt, beim Lesen einer flüchtigen Variablen kann (jeder Thread) immer sehen, wie zuletzt in diese flüchtige Variable geschrieben wurde.“ ConcurrentLinkedQueue verwendet Volatile, um einen sich gegenseitig ausschließenden Zugriff mehrerer Threads auf konkurrierende Ressourcen zu erreichen.


ConcurrentLinkedQueue-Funktionsliste


// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)
// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
Das Folgende wird aus ConcurrentLinkedQueue erstellt, hinzufügen , Löschen Sie diese Aspekte und analysieren Sie sie.

1 Erstellen

Das Folgende wird anhand von ConcurrentLinkedQueue() erläutert.


public ConcurrentLinkedQueue() {
 head = tail = new Node<E>(null);
}
Erklärung: Im

Konstruktor wird ein neuer „Knoten mit Nullinhalt“ erstellt und Kopf- und Fußzeile festgelegt Der Wert of tail ist der neue Knoten.

Die Definitionen von Kopf und Schwanz lauten wie folgt:


private transient volatile Node<E> head;
private transient volatile Node<E> tail;
Kopf und Schwanz sind beide flüchtige Typen und haben die Bedeutung, die durch flüchtig angegeben wird : „Das heißt, beim Lesen einer flüchtigen Variablen kann (jeder Thread) immer sehen, wie zuletzt in diese flüchtige Variable geschrieben wurde.“ Die Deklaration von

Node lautet wie folgt:


private static class Node<E> {
 volatile E item;
 volatile Node<E> next;
 Node(E item) {
 UNSAFE.putObject(this, itemOffset, item);
 }
 boolean casItem(E cmp, E val) {
 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 }
 void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
 }
 boolean casNext(Node<E> cmp, Node<E> val) {
 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 }
 // Unsafe mechanics
 private static final sun.misc.Unsafe UNSAFE;
 private static final long itemOffset;
 private static final long nextOffset;
 static {
 try {
  UNSAFE = sun.misc.Unsafe.getUnsafe();
  Class k = Node.class;
  itemOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("item"));
  nextOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("next"));
 } catch (Exception e) {
  throw new Error(e);
 }
 }
}
Erklärung:

Node ist ein einseitig verknüpfter Listenknoten. next wird verwendet, um auf den nächsten Knoten zu verweisen, und item wird zum Speichern von Daten verwendet. Die APIs zum Betreiben von Knotendaten in Node werden alle über die CAS-Funktion des Unsafe-Mechanismus implementiert. Beispielsweise „vergleicht und legt casNext() den nächsten Knoten des Knotens fest“ über die CAS-Funktion.


2. Hinzufügen

Im Folgenden wird add(E e) als Beispiel verwendet, um das Hinzufügen in ConcurrentLinkedQueue zu erläutern.


public boolean add(E e) {
 return offer(e);
}
Erklärung: add() ruft tatsächlich offer() auf, um den Additionsvorgang abzuschließen.

Der Quellcode von offer() lautet wie folgt:


public boolean offer(E e) {
 // 检查e是不是null,是的话抛出NullPointerException异常。
 checkNotNull(e);
 // 创建新的节点
 final Node<E> newNode = new Node<E>(e);

 // 将“新的节点”添加到链表的末尾。
 for (Node<E> t = tail, p = t;;) {
 Node<E> q = p.next;
 // 情况1:q为空
 if (q == null) {
  // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
  // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
  // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
  if (p.casNext(null, newNode)) {
  if (p != t) // hop two nodes at a time
   casTail(t, newNode); // Failure is OK.
  return true;
  }
 }
 // 情况2:p和q相等
 else if (p == q)
  p = (t != (t = tail)) ? t : head;
 // 情况3:其它
 else
  p = (p != t && t != (t = tail)) ? t : q;
 }
}
Erklärung: Die Funktion von offer(E e) besteht darin, Element e hinzuzufügen bis zum Ende der verknüpften Liste. Der Vergleich von offer() dient dem Verständnis der

for-Schleife. Im Folgenden werden drei Situationen unterschieden, nach denen analysiert werden muss.

Fall 1 – q ist leer. Das bedeutet, dass q der Knoten neben dem Endknoten ist. Setzen Sie zu diesem Zeitpunkt „p's nächsten Knoten auf newNode“ über p.casNext(null, newNode). Wenn die Einstellung erfolgreich ist, vergleichen Sie „p und t“ (wenn p nicht gleich t ist, setzen Sie newNode auf den neuen Endknoten). ), dann gibt true zurück. Andernfalls (d. h. „andere Threads haben den Endknoten geändert“) tun Sie nichts und fahren Sie mit der for-Schleife fort.

p.casNext(null, newNode) ruft CAS auf, um p zu bearbeiten. Wenn „p's nächster Knoten gleich null ist“, dann setzen Sie „p’s nächster Knoten ist gleich newNode“. Wenn die Einstellung erfolgreich ist, wird „true“ zurückgegeben, wenn sie fehlschlägt, wird „false“ zurückgegeben.

Fall 2 – p und q sind gleich. Wann wird das passieren? Durch „Fall 3“ wissen wir, dass nach der Verarbeitung von „Fall 3“ der Wert von p gleich q sein kann.

Wenn sich zu diesem Zeitpunkt der Endknoten nicht geändert hat, sollte sich der Kopfknoten geändert haben. Legen Sie dann p als Kopfknoten fest und durchlaufen Sie die verknüpfte Liste andernfalls erneut (wenn sich der Endknoten ändert). , setze p als Endknoten.

Fall 3 – Andere.

Wir konvertieren p = (p != t && t != (t = tail)) ?


if (p==t) {
 p = q;
} else {
 Node<E> tmp=t;
 t = tail;
 if (tmp==t) {
 p=q;
 } else {
 p=t;
 }
}
Wenn p und t gleich sind, setze p auf q. Andernfalls stellen Sie fest, „ob sich der Endknoten geändert hat“. Wenn es keine Änderung gibt, setzen Sie p auf q, andernfalls setzen Sie p auf den Endknoten. Der Quellcode von

checkNotNull() lautet wie folgt:



private static void checkNotNull(Object v) {
 if (v == null)
 throw new NullPointerException();
}
3. Löschen Sie

Die folgenden Schritte poll() als Beispiel Das Löschen in ConcurrentLinkedQueue wird erklärt.


public E poll() {
 // 设置“标记”
 restartFromHead:
 for (;;) {
 for (Node<E> h = head, p = h, q;;) {
  E item = p.item;

  // 情况1
  // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
  // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
  if (item != null && p.casItem(item, null)) {
  if (p != h) // hop two nodes at a time
   updateHead(h, ((q = p.next) != null) ? q : p);
  return item;
  }
  // 情况2
  // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
  else if ((q = p.next) == null) {
  updateHead(h, p);
  return null;
  }
  // 情况3
  // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
  else if (p == q)
  continue restartFromHead;
  // 情况4
  // 设置p为q
  else
  p = q;
 }
 }
}

说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。

p.casItem(item, null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。

在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。

updateHead()的源码如下:


final void updateHead(Node<E> h, Node<E> p) {
 if (h != p && casHead(h, p))
 h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。

casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。

lazySetNext()的源码如下:


void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。

则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q

在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。

设置p=q。

ConcurrentLinkedQueue示例


import java.util.*;
 import java.util.concurrent.*;
 /*
 * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 *
 * 下面是“多个线程同时操作并且遍历queue”的示例
 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
 *
 *
 */
 public class ConcurrentLinkedQueueDemo1 {
 // TODO: queue是LinkedList对象时,程序会出错。
 //private static Queue<String> queue = new LinkedList<String>();
 private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
 public static void main(String[] args) {
  // 同时启动两个线程对queue进行操作!
  new MyThread("ta").start();
  new MyThread("tb").start();
 }
 private static void printAll() {
  String value;
  Iterator iter = queue.iterator();
  while(iter.hasNext()) {
  value = (String)iter.next();
  System.out.print(value+", ");
  }
  System.out.println();
 }
 private static class MyThread extends Thread {
  MyThread(String name) {
  super(name);
  }
  @Override
  public void run() {
   int i = 0;
  while (i++ < 6) {
   // “线程名” + "-" + "序号"
   String val = Thread.currentThread().getName()+i;
   queue.add(val);
   // 通过“Iterator”遍历queue。
   printAll();
  }
  }
 }
 }

(某一次)运行结果:


ta1, ta1, tb1, tb1,
ta1, ta1, tb1, tb1, ta2, ta2, tb2, 
tb2, 
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, 
ta3, ta1, tb3, tb1, ta4, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, 
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, 
tb4, ta5, tb5, ta6, tb6,

结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung der Java-Parallelitätssammlung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn