一、HashMap和ConcurrentHashMap的對比
#我們用一段程式碼證明下HashMap的執行緒不安全,以及ConcurrentHashMap的線程安全性。程式碼邏輯很簡單,開啟10000個線程,每個線程做很簡單的操作,就是put一個key,然後刪除一個key,理論上線程安全的情況下,最後map的size()肯定為0。
推薦教學:《java學習》
Map<Object,Object> myMap=new HashMap<>(); // ConcurrentHashMap myMap=new ConcurrentHashMap(); for (int i = 0; i <10000 ; i++) { new Thread(new Runnable() { @Override public void run() { double a=Math.random(); myMap.put(a,a); myMap.remove(a); } }).start(); } Thread.sleep(15000l);//多休眠下,保证上面的线程操作完毕。 System.out.println(myMap.size());
這裡顯示Map的size=13,但其實map裡還有一個key。同樣的程式碼我們用ConcurrentHashMap來運行下,結果map ==0
這裡也就證明了ConcurrentHashMap是線程安全的,我們接下來從源碼分析下ConcurrentHashMap是如何保證線程安全的,本次源碼jdk版本為1.8。
二、ConcurrentHashMap原始碼分析
3.1 ConcurrentHashMap的基礎屬性
//默认最大的容量 private static final int MAXIMUM_CAPACITY = 1 << 30; //默认初始化的容量 private static final int DEFAULT_CAPACITY = 16; //最大的数组可能长度 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //默认的并发级别,目前并没有用,只是为了保持兼容性 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //和hashMap一样,负载因子 private static final float LOAD_FACTOR = 0.75f; //和HashMap一样,链表转换为红黑树的阈值,默认是8 static final int TREEIFY_THRESHOLD = 8; //红黑树转换链表的阀值,默认是6 static final int UNTREEIFY_THRESHOLD = 6; //进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容 static final int MIN_TREEIFY_CAPACITY = 64; //table扩容时, 每个线程最少迁移table的槽位个数 private static final int MIN_TRANSFER_STRIDE = 16; //感觉是用来计算偏移量和线程数量的标记 private static int RESIZE_STAMP_BITS = 16; //能够调整的最大线程数量 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //记录偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //值为-1, 当Node.hash为MOVED时, 代表着table正在扩容 static final int MOVED = -1; //TREEBIN, 置为-2, 代表此元素后接红黑树 static final int TREEBIN = -2; //感觉是占位符,目前没看出来明显的作用 static final int RESERVED = -3; //主要用来计算Hash值的 static final int HASH_BITS = 0x7fffffff; //节点数组 transient volatile Node<K,V>[] table; //table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上 private transient volatile Node<K,V>[] nextTable; //基础计数器 private transient volatile long baseCount; //table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化 //-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小 private transient volatile int sizeCtl; //table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标 private transient volatile int transferIndex; //扩容时候,CAS锁标记 private transient volatile int cellsBusy; //计数器表,大小是2次幂 private transient volatile CounterCell[] counterCells;
上面就是ConcurrentHashMap的基本屬性,我們大部分和HashMap一樣,只是增加了部分屬性,後面我們來分析增加的部分屬性是起到如何的作用的。
2.2 ConcurrentHashMap的常用方法屬性
put方法
final V putVal(K key, V value, boolean onlyIfAbsent) { //key和value不允许为null if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果table没有初始化,进行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //计算数组的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果该位置为空,构造新节点添加即可 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }//如果正在扩容 else if ((fh = f.hash) == MOVED) //帮着一起扩容 tab = helpTransfer(tab, f); else { //开始真正的插入 V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //如果已经初始化完成了 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //这里key相同直接覆盖原来的节点 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //否则添加到节点的最后面 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }//如果节点是树节点,就进行树节点添加操作 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } }//判断节点是否要转换成红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i);//红黑树转换 if (oldVal != null) return oldVal; break; } } } //计数器,采用CAS计算size大小,并且检查是否需要扩容 addCount(1L, binCount); return null; }
我們發現ConcurrentHashMap的put方法和HashMap的邏輯相差不大,主要是新增了執行緒安全性部分,在加入元素時候,採用synchronized來確保執行緒安全,然後計算size的時候採用CAS運算進行計算。整個put流程比較簡單,總結下就是:
1.判斷key和vaule是否為空,如果為空,直接拋出例外。
2.判斷table數組是否已經初始化完畢,如果沒有初始化,進行初始化。
3.計算key的hash值,若位置為空,直接建構節點放入。
4.如果table正在擴容,進入幫助擴容方法。
5.最後開啟同步鎖,進行插入操作,如果開啟了覆蓋選項,直接覆蓋,否則,構造節點添加到尾部,如果節點數超過紅黑樹閾值,進行紅黑樹轉換。如果目前節點是樹節點,進行樹插入操作。
6.最後統計size大小,並計算是否需要擴容。
get方法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //计算hash值 int h = spread(key.hashCode()); //如果table已经初始化,并且计算hash值的索引位置node不为空 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果hash相等,key相等,直接返回该节点的value if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; }//如果hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到节点。 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //循环遍历链表,查询key和hash值相等的节点。 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
get方法比較簡單,主要流程如下:
1.直接計算hash值,查找的節點如果key和hash值相等,直接傳回該節點的value就行。
2.如果table正在擴容,就呼叫ForwardingNode的find方法找節點。
3.如果沒有擴容,直接循環遍歷鍊錶,查找到key和hash值一樣的節點值即可。
ConcurrentHashMap的擴容
ConcurrentHashMap的擴容相對於HashMap的擴容相對複雜,因為涉及了多執行緒操作,這裡擴容方法主要是transfer,我們來分析下這個方法的源碼,研究下方是如何擴容的。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //保证每个线程扩容最少是16, if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { //扩容2倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME //出现异常情况就不扩容了。 sizeCtl = Integer.MAX_VALUE; return; } //用新数组对象接收 nextTable = nextTab; //初始化扩容下表为原数组的长度 transferIndex = n; } int nextn = nextTab.length; //扩容期间的过渡节点 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; //如果该线程已经完成了 if (--i >= bound || finishing) advance = false; //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; }CAS操作设置区间 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) {//如果扩容完成了,进行收尾工作 nextTable = null;//清空临时数组 table = nextTab;//赋值原数组 sizeCtl = (n << 1) - (n >>> 1);//重新赋值sizeCtl return; }//如果扩容还在进行,自己任务完成就进行sizeCtl-1,这里是因为,扩容是通过helpTransfer()和addCount()方法来调用的,在调用transfer()真正扩容之前,sizeCtl都会+1,所以这里每个线程完成后就进行-1。 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //这里应该是判断扩容是否结束 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //结束,赋值状态 finishing = advance = true; i = n; // recheck before commit } }//如果在table中没找到,就用过渡节点 else if ((f = tabAt(tab, i)) == null) //成功设置就进入下一个节点 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可 advance = true; // already processed else { //这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //如果hash值大于0 if (fh >= 0) { //为运算结果 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; //这里的逻辑和hashMap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析HashMap的文章 if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; }//如果是树节点,执行树节点的扩容数据转移 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); //也是通过位运算判断两个链表的位置 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //这里判断是否进行树转换 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; //这里把新处理的链表赋值到新数组中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
ConcurrentHashMap的擴容還是比較複雜,複雜主要表現在,控制多線程擴容層面上,擴容的源碼我沒有解析的很細,一方面是確實比較複雜,本人有某些地方也不是太明白,另一方面是我覺得我們研究主要是弄懂其思想,能搞明白關鍵代碼和關鍵思路即可,只要不是重新實現一套類似的功能,我想就不用糾結其全部細節了。總結下ConcurrentHashMap的擴容步驟如下:
1.取得執行緒擴容處理步長,最少是16,也就是單一執行緒處理擴容的節點個數。
2.新建一個原來容量2倍的數組,建構過渡節點,用於擴容期間的查詢操作。
3.進行死循環進行轉移節點,主要根據finishing變數判斷是否擴容結束,在擴容期間透過給不同的線程設定不同的下表索引進行擴容操作,就是不同的線程,操作的數組分段不一樣,同時利用synchronized同步鎖定來鎖住操作的節點,確保了執行緒安全。
4.真正進行節點在新數組的位置是和HashMap擴容邏輯一樣的,透過位元運算計算出新鍊錶是否位於原位置或位於原位置擴容的長度位置,具體分析可以查看我的這篇文章。
三、總結
1.ConcurrentHashMap大部分的邏輯程式碼和HashMap是一樣的,主要透過synchronized和來保證節點插入擴容的線程安全,這裡肯定有同學會問,為啥使用synchronized呢?而不用採取樂觀鎖,或lock呢?我個人覺得可能原因有2點:
a.樂觀鎖比較適用於競爭衝突比較少的場景,如果衝突比較多,那麼就會導致不停的重試,這樣反而性能更低。
b.synchronized在經歷了優化之後,其實效能已經和lock沒啥差異了,某些場景可能還比lock快。所以,我覺得這是採用synchronized來同步的原因。
2.ConcurrentHashMap的擴容核心邏輯主要是給不同的執行緒分配不同的陣列下標,然後每個執行緒處理各自下表區間的節點。同時處理節點復用了hashMap的邏輯,透過位元運行,可以知道節點擴容後的位置,要麼在原位置,要麼在原位置 oldlength位置,最後直接賦值即可。
以上是concurrenthashmap為什麼是線程安全的詳細內容。更多資訊請關注PHP中文網其他相關文章!