首頁 >Java >java教程 >Java concurrency集合的詳解

Java concurrency集合的詳解

零下一度
零下一度原創
2017-06-17 11:35:551547瀏覽

這篇文章主要介紹了Java concurrency集合之ConcurrentLinkedQueue,需要的朋友可以參考下

ConcurrentLinkedQueue介紹

ConcurrentLinkedQueue是線程安全的隊列,它適用於「高並發」的場景。

它是一個基於連結節點的無界線程安全佇列,按照 FIFO(先進先出)原則對元素進行排序。佇列元素中不可以放置null元素(內部實作的特殊節點除外)。

ConcurrentLinkedQueue原理與資料結構

ConcurrentLinkedQueue的資料結構,如下圖所示:

說明:

1. ConcurrentLinkedQueue繼承於AbstractQueue。

2. ConcurrentLinkedQueue內部是透過鍊錶來實現的。它同時包含鍊錶的頭節點head和尾節點tail。 ConcurrentLinkedQueue依照 FIFO(先進先出)原則對元素進行排序。元素都是從尾部插入到鍊錶,從頭部開始返回。

3. ConcurrentLinkedQueue的鍊錶Node中的next的型別是volatile,而且鍊錶資料item的型別也是volatile。關於volatile,我們知道它的語意包含:「即對一個volatile變數的讀,總是能看到(任意線程)對這個volatile變數最後的寫入」。 ConcurrentLinkedQueue就是透過volatile來實現多執行緒對競爭資源的互斥存取的。

ConcurrentLinkedQueue函數列表


// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)
// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)

下面從ConcurrentLinkedQueue的創建,添加,刪除這幾個方面對它進行分析。

1 建立

下面以ConcurrentLinkedQueue()來進行說明。


public ConcurrentLinkedQueue() {
 head = tail = new Node<E>(null);
}

說明:在建構子中,新建了一個“內容為null的節點”,並設定表頭head和表尾tail的值為新節點。

head和tail的定義如下:


private transient volatile Node<E> head;
private transient volatile Node<E> tail;

head和tail都是volatile類型,他們具有volatile賦予的含義:「即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變數最後的寫入」。

Node的宣告如下:


private static class Node<E> {
 volatile E item;
 volatile Node<E> next;
 Node(E item) {
 UNSAFE.putObject(this, itemOffset, item);
 }
 boolean casItem(E cmp, E val) {
 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 }
 void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
 }
 boolean casNext(Node<E> cmp, Node<E> val) {
 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 }
 // Unsafe mechanics
 private static final sun.misc.Unsafe UNSAFE;
 private static final long itemOffset;
 private static final long nextOffset;
 static {
 try {
  UNSAFE = sun.misc.Unsafe.getUnsafe();
  Class k = Node.class;
  itemOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("item"));
  nextOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("next"));
 } catch (Exception e) {
  throw new Error(e);
 }
 }
}

說明:

Node是個單向鍊錶節點,next用來指向下一個Node, item用於儲存資料。 Node中操作節點資料的API,都是透過Unsafe機制的CAS函數實現的;例如casNext()是透過CAS函數「比較並設定節點的下一個節點」。

2. 新增

下面以add(E e)為例對ConcurrentLinkedQueue中的新增進行說明。


public boolean add(E e) {
 return offer(e);
}

說明:add()其實是呼叫的offer()來完成新增操作的。

offer()的原始碼如下:


public boolean offer(E e) {
 // 检查e是不是null,是的话抛出NullPointerException异常。
 checkNotNull(e);
 // 创建新的节点
 final Node<E> newNode = new Node<E>(e);

 // 将“新的节点”添加到链表的末尾。
 for (Node<E> t = tail, p = t;;) {
 Node<E> q = p.next;
 // 情况1:q为空
 if (q == null) {
  // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
  // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
  // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
  if (p.casNext(null, newNode)) {
  if (p != t) // hop two nodes at a time
   casTail(t, newNode); // Failure is OK.
  return true;
  }
 }
 // 情况2:p和q相等
 else if (p == q)
  p = (t != (t = tail)) ? t : head;
 // 情况3:其它
 else
  p = (p != t && t != (t = tail)) ? t : q;
 }
}

說明:offer(E e)的作用是將元素e加到鍊錶的末端。 offer()比較的地方是理解for迴圈,以下區分3種情況對for進行分析。

情況1 -- q為空。這意味著q是尾節點的下一個節點。此時,透過p.casNext(null, newNode)將“p的下一個節點設為newNode”,若設定成功的話,則比較“p和t”(若p不等於t,則設定newNode為新的尾節點),然後回傳true。否則的話(意味著「其它線程對尾節點進行了修改」),什麼也不做,繼續進行for迴圈。

p.casNext(null, newNode),是呼叫CAS對p進行操作。若“p的下一個節點等於null”,則設定“p的下一個節點等於newNode”;設定成功的話,回傳true,失敗的話回傳false。

情況2 -- p和q相等。這種情況什麼時候會發生呢?透過“情況3”,我們知道,經過“情況3”的處理後,p的值可能等於q。

此時,若尾節點沒有發生變化的話,那麼,應該是頭節點發生了變化,則設定p為頭節點,然後重新遍歷鍊錶;否則(尾節點變化的話),則設定p為尾節點。

情況3 -- 其它。

我們將p = (p != t && t != (t = tail)) ? t : q;轉換成如下程式碼。


if (p==t) {
 p = q;
} else {
 Node<E> tmp=t;
 t = tail;
 if (tmp==t) {
 p=q;
 } else {
 p=t;
 }
}

如果p和t相等,則設定p為q。否則的話,判斷“尾節點是否發生變化”,沒有變化的話,則設定p為q;否則,設定p為尾節點。

checkNotNull()的原始碼如下:


private static void checkNotNull(Object v) {
 if (v == null)
 throw new NullPointerException();
}

3. 刪除

以下以poll()為例對ConcurrentLinkedQueue中的刪除進行說明。


public E poll() {
 // 设置“标记”
 restartFromHead:
 for (;;) {
 for (Node<E> h = head, p = h, q;;) {
  E item = p.item;

  // 情况1
  // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
  // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
  if (item != null && p.casItem(item, null)) {
  if (p != h) // hop two nodes at a time
   updateHead(h, ((q = p.next) != null) ? q : p);
  return item;
  }
  // 情况2
  // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
  else if ((q = p.next) == null) {
  updateHead(h, p);
  return null;
  }
  // 情况3
  // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
  else if (p == q)
  continue restartFromHead;
  // 情况4
  // 设置p为q
  else
  p = q;
 }
 }
}

说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。

p.casItem(item, null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。

在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。

updateHead()的源码如下:


final void updateHead(Node<E> h, Node<E> p) {
 if (h != p && casHead(h, p))
 h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。

casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。

lazySetNext()的源码如下:


void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。

则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q

在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。

设置p=q。

ConcurrentLinkedQueue示例


import java.util.*;
 import java.util.concurrent.*;
 /*
 * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 *
 * 下面是“多个线程同时操作并且遍历queue”的示例
 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
 *
 *
 */
 public class ConcurrentLinkedQueueDemo1 {
 // TODO: queue是LinkedList对象时,程序会出错。
 //private static Queue<String> queue = new LinkedList<String>();
 private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
 public static void main(String[] args) {
  // 同时启动两个线程对queue进行操作!
  new MyThread("ta").start();
  new MyThread("tb").start();
 }
 private static void printAll() {
  String value;
  Iterator iter = queue.iterator();
  while(iter.hasNext()) {
  value = (String)iter.next();
  System.out.print(value+", ");
  }
  System.out.println();
 }
 private static class MyThread extends Thread {
  MyThread(String name) {
  super(name);
  }
  @Override
  public void run() {
   int i = 0;
  while (i++ < 6) {
   // “线程名” + "-" + "序号"
   String val = Thread.currentThread().getName()+i;
   queue.add(val);
   // 通过“Iterator”遍历queue。
   printAll();
  }
  }
 }
 }

(某一次)运行结果:


ta1, ta1, tb1, tb1,
ta1, ta1, tb1, tb1, ta2, ta2, tb2, 
tb2, 
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, 
ta3, ta1, tb3, tb1, ta4, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, 
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, 
tb4, ta5, tb5, ta6, tb6,

结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

以上是Java concurrency集合的詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn