Home >Java >javaTutorial >Detailed explanation of Java concurrency collection

Detailed explanation of Java concurrency collection

零下一度
零下一度Original
2017-06-17 11:35:551553browse

This article mainly introduces the Java concurrency collection ConcurrentLinkedQueue. Friends who need it can refer to

Introduction to ConcurrentLinkedQueue

ConcurrentLinkedQueue is a thread-safe queue, which is suitable for "high concurrency" scenarios.

It is an unbounded thread-safe queue based on link nodes, sorting elements according to the FIFO (first in, first out) principle. Null elements cannot be placed in queue elements (except for special nodes implemented internally).

ConcurrentLinkedQueue principle and data structure

The data structure of ConcurrentLinkedQueue is as shown in the figure below:

Description:

1. ConcurrentLinkedQueue inherits from AbstractQueue.

2. ConcurrentLinkedQueue is implemented internally through a linked list. It contains both the head node head and the tail node tail of the linked list. ConcurrentLinkedQueue sorts elements according to the FIFO (first in, first out) principle. Elements are inserted into the linked list from the tail and returned from the head.

3. The type of next in the linked list Node of ConcurrentLinkedQueue is volatile, and the type of linked list data item is also volatile. Regarding volatile, we know that its semantics include: "That is, reading a volatile variable can always see (any thread) the last write to this volatile variable." ConcurrentLinkedQueue uses volatile to achieve mutually exclusive access to competing resources by multiple threads.

ConcurrentLinkedQueue function list


// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)
// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)

The following is the creation, addition and deletion of ConcurrentLinkedQueue. Analyze it from several aspects.

1 Create

The following uses ConcurrentLinkedQueue() to explain.


public ConcurrentLinkedQueue() {
 head = tail = new Node<E>(null);
}

Explanation: In the constructor, a new "node with null content" is created, and the head and tail of the table are set. The value is the new node.

The definitions of head and tail are as follows:


private transient volatile Node<E> head;
private transient volatile Node<E> tail;

head and tail are both volatile types, and they have the meaning given by volatile: "That is, for a volatile variable A read can always see (any thread) the last write to this volatile variable."

The declaration of Node is as follows:


private static class Node<E> {
 volatile E item;
 volatile Node<E> next;
 Node(E item) {
 UNSAFE.putObject(this, itemOffset, item);
 }
 boolean casItem(E cmp, E val) {
 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 }
 void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
 }
 boolean casNext(Node<E> cmp, Node<E> val) {
 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 }
 // Unsafe mechanics
 private static final sun.misc.Unsafe UNSAFE;
 private static final long itemOffset;
 private static final long nextOffset;
 static {
 try {
  UNSAFE = sun.misc.Unsafe.getUnsafe();
  Class k = Node.class;
  itemOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("item"));
  nextOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("next"));
 } catch (Exception e) {
  throw new Error(e);
 }
 }
}

Explanation:

Node is a one-way linked list node, next is used to point to the next Node, item is used to store data. The APIs for operating node data in Node are all implemented through the CAS function of the Unsafe mechanism; for example, casNext() "compares and sets the next node of the node" through the CAS function.

2. Add

The following uses add(E e) as an example to explain the addition in ConcurrentLinkedQueue.


public boolean add(E e) {
 return offer(e);
}

Explanation: add() actually calls offer() to complete the adding operation.

The source code of offer() is as follows:


public boolean offer(E e) {
 // 检查e是不是null,是的话抛出NullPointerException异常。
 checkNotNull(e);
 // 创建新的节点
 final Node<E> newNode = new Node<E>(e);

 // 将“新的节点”添加到链表的末尾。
 for (Node<E> t = tail, p = t;;) {
 Node<E> q = p.next;
 // 情况1:q为空
 if (q == null) {
  // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
  // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
  // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
  if (p.casNext(null, newNode)) {
  if (p != t) // hop two nodes at a time
   casTail(t, newNode); // Failure is OK.
  return true;
  }
 }
 // 情况2:p和q相等
 else if (p == q)
  p = (t != (t = tail)) ? t : head;
 // 情况3:其它
 else
  p = (p != t && t != (t = tail)) ? t : q;
 }
}

Description: The function of offer(E e) is to add element e to the end of the linked list. The comparison of offer() is to understand for loop. The following distinguishes 3 situations to analyze for.

Case 1 -- q is empty. This means that q is the node next to the tail node. At this time, set "p's next node to newNode" through p.casNext(null, newNode). If the setting is successful, compare "p and t" (if p is not equal to t, set newNode to the new tail node), then returns true. Otherwise (meaning "other threads modified the tail node"), do nothing and continue the for loop.

p.casNext(null, newNode) calls CAS to operate on p. If "p's next node is equal to null", then set "p's next node is equal to newNode"; if the setting is successful, it will return true, if it fails, it will return false.

Case 2 -- p and q are equal. When will this happen? Through "Case 3", we know that after the processing of "Case 3", the value of p may be equal to q.

At this time, if the tail node has not changed, then the head node should have changed, then set p as the head node, and then traverse the linked list again; otherwise (if the tail node changes), set p as tail node.

Case 3 -- Others.

We convert p = (p != t && t != (t = tail)) ? t : q; into the following code.


if (p==t) {
 p = q;
} else {
 Node<E> tmp=t;
 t = tail;
 if (tmp==t) {
 p=q;
 } else {
 p=t;
 }
}

If p and t are equal, set p to q. Otherwise, determine "whether the tail node has changed". If there is no change, set p to q; otherwise, set p to the tail node.

The source code of checkNotNull() is as follows:


private static void checkNotNull(Object v) {
 if (v == null)
 throw new NullPointerException();
}

3. Delete

The following uses poll() as an example to configure ConcurrentLinkedQueue Deletion in .


public E poll() {
 // 设置“标记”
 restartFromHead:
 for (;;) {
 for (Node<E> h = head, p = h, q;;) {
  E item = p.item;

  // 情况1
  // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
  // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
  if (item != null && p.casItem(item, null)) {
  if (p != h) // hop two nodes at a time
   updateHead(h, ((q = p.next) != null) ? q : p);
  return item;
  }
  // 情况2
  // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
  else if ((q = p.next) == null) {
  updateHead(h, p);
  return null;
  }
  // 情况3
  // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
  else if (p == q)
  continue restartFromHead;
  // 情况4
  // 设置p为q
  else
  p = q;
 }
 }
}

说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。

p.casItem(item, null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。

在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。

updateHead()的源码如下:


final void updateHead(Node<E> h, Node<E> p) {
 if (h != p && casHead(h, p))
 h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。

casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。

lazySetNext()的源码如下:


void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。

则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q

在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。

设置p=q。

ConcurrentLinkedQueue示例


import java.util.*;
 import java.util.concurrent.*;
 /*
 * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 *
 * 下面是“多个线程同时操作并且遍历queue”的示例
 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
 *
 *
 */
 public class ConcurrentLinkedQueueDemo1 {
 // TODO: queue是LinkedList对象时,程序会出错。
 //private static Queue<String> queue = new LinkedList<String>();
 private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
 public static void main(String[] args) {
  // 同时启动两个线程对queue进行操作!
  new MyThread("ta").start();
  new MyThread("tb").start();
 }
 private static void printAll() {
  String value;
  Iterator iter = queue.iterator();
  while(iter.hasNext()) {
  value = (String)iter.next();
  System.out.print(value+", ");
  }
  System.out.println();
 }
 private static class MyThread extends Thread {
  MyThread(String name) {
  super(name);
  }
  @Override
  public void run() {
   int i = 0;
  while (i++ < 6) {
   // “线程名” + "-" + "序号"
   String val = Thread.currentThread().getName()+i;
   queue.add(val);
   // 通过“Iterator”遍历queue。
   printAll();
  }
  }
 }
 }

(某一次)运行结果:


ta1, ta1, tb1, tb1,
ta1, ta1, tb1, tb1, ta2, ta2, tb2, 
tb2, 
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, 
ta3, ta1, tb3, tb1, ta4, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, 
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, 
tb4, ta5, tb5, ta6, tb6,

结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

The above is the detailed content of Detailed explanation of Java concurrency collection. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn