Heim  >  Artikel  >  Warum ist ConcurrentHashmap Thread-sicher?

Warum ist ConcurrentHashmap Thread-sicher?

藏色散人
藏色散人Original
2020-03-30 09:21:267948Durchsuche

Warum ist ConcurrentHashmap Thread-sicher?

1. Vergleich zwischen HashMap und ConcurrentHashMap

Wir verwenden einen Code, um die Thread-Unsicherheit von HashMap und die Thread-Sicherheit von ConcurrentHashMap zu beweisen. Die Codelogik ist sehr einfach. Jeder Thread führt eine einfache Operation aus, nämlich das Einfügen eines Schlüssels und das anschließende Löschen eines Schlüssels. Wenn die Thread-Sicherheit erhalten bleibt, muss die endgültige Map-Größe () 0 sein.

Empfohlenes Tutorial: „Java-Lernen

Map<Object,Object> myMap=new HashMap<>();
        // ConcurrentHashMap myMap=new ConcurrentHashMap();
        for (int i = 0; i <10000 ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                        double a=Math.random();
                        myMap.put(a,a);
                        myMap.remove(a);
                }
            }).start();
        }
        Thread.sleep(15000l);//多休眠下,保证上面的线程操作完毕。
        System.out.println(myMap.size());

Die Größe der Karte wird hier angezeigt=13, aber tatsächlich gibt es einen Schlüssel in der Karte. Wir verwenden ConcurrentHashMap, um denselben Code auszuführen, und die Ergebniskarte ==0

Dies beweist, dass ConcurrentHashMap Thread-sicher ist. Lassen Sie uns analysieren, wie ConcurrentHashMap die Thread-Sicherheit aus dem Quellcode gewährleistet Die JDK-Version ist 1,8.

2. ConcurrentHashMap-Quellcode-Analyse

3.1 Grundattribute von ConcurrentHashMap

//默认最大的容量 
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始化的容量
private static final int DEFAULT_CAPACITY = 16;
//最大的数组可能长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认的并发级别,目前并没有用,只是为了保持兼容性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//和hashMap一样,负载因子
private static final float LOAD_FACTOR = 0.75f;
//和HashMap一样,链表转换为红黑树的阈值,默认是8
static final int TREEIFY_THRESHOLD = 8;
//红黑树转换链表的阀值,默认是6
static final int UNTREEIFY_THRESHOLD = 6;
//进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容
static final int MIN_TREEIFY_CAPACITY = 64;
//table扩容时, 每个线程最少迁移table的槽位个数
private static final int MIN_TRANSFER_STRIDE = 16;
//感觉是用来计算偏移量和线程数量的标记
private static int RESIZE_STAMP_BITS = 16;
//能够调整的最大线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//记录偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//值为-1, 当Node.hash为MOVED时, 代表着table正在扩容
static final int MOVED     = -1;
//TREEBIN, 置为-2, 代表此元素后接红黑树
static final int TREEBIN   = -2;
//感觉是占位符,目前没看出来明显的作用
static final int RESERVED  = -3;
//主要用来计算Hash值的
static final int HASH_BITS = 0x7fffffff; 
//节点数组
transient volatile Node<K,V>[] table;
//table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上
private transient volatile Node<K,V>[] nextTable;
//基础计数器
private transient volatile long baseCount;
//table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化
//-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小
private transient volatile int sizeCtl;
//table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标
private transient volatile int transferIndex;
//扩容时候,CAS锁标记
private transient volatile int cellsBusy;
//计数器表,大小是2次幂
private transient volatile CounterCell[] counterCells;

Die meisten von uns sind die gleichen wie HashMap, aber nur Fügen Sie später einige Attribute hinzu. Lassen Sie uns analysieren, welche Rolle die hinzugefügten Attribute spielen.

2.2 Gemeinsame Methodenattribute von ConcurrentHashMap

Put-Methode

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key和value不允许为null
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果table没有初始化,进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //计算数组的位置    
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果该位置为空,构造新节点添加即可
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }//如果正在扩容
            else if ((fh = f.hash) == MOVED)
            //帮着一起扩容
                tab = helpTransfer(tab, f);
            else {
            //开始真正的插入
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                    //如果已经初始化完成了
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //这里key相同直接覆盖原来的节点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //否则添加到节点的最后面
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,  value, null);
                                    break;
                                }
                            }
                        }//如果节点是树节点,就进行树节点添加操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }//判断节点是否要转换成红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);//红黑树转换
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //计数器,采用CAS计算size大小,并且检查是否需要扩容
        addCount(1L, binCount);
        return null;
    }

Wir haben festgestellt, dass sich die Logik der Put-Methode von ConcurrentHashMap nicht wesentlich von der von HashMap unterscheidet, hauptsächlich aufgrund von Verwenden Sie beim Hinzufügen von Elementen die Synchronisierung, um die Thread-Sicherheit sicherzustellen, und verwenden Sie dann CAS-Operationen, um die Größe zu berechnen. Der gesamte Put-Prozess ist relativ einfach:

1. Stellen Sie fest, ob der Schlüssel und der Wert leer sind, und lösen Sie direkt eine Ausnahme aus.

2. Stellen Sie fest, ob das Tabellenarray initialisiert wurde. Wenn nicht, initialisieren Sie es.

3. Berechnen Sie den Hash-Wert des Schlüssels. Wenn die Position leer ist, erstellen Sie den Knoten direkt und fügen Sie ihn ein.

4. Wenn die Tabelle erweitert wird, geben Sie die Hilfeerweiterungsmethode ein.

Aktivieren Sie abschließend die Synchronisierungssperre und führen Sie den Einfügevorgang durch. Andernfalls wird der erstellte Knoten am Ende hinzugefügt Wenn die Anzahl der Knoten den Rot-Schwarz-Baum-Schwellenwert überschreitet, wird eine Rot-Schwarz-Baum-Konvertierung durchgeführt. Wenn der aktuelle Knoten ein Baumknoten ist, führen Sie einen Baumeinfügevorgang durch.

6. Zählen Sie abschließend die Größe und berechnen Sie, ob eine Erweiterung erforderlich ist.

Get-Methode

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算hash值
        int h = spread(key.hashCode());
        //如果table已经初始化,并且计算hash值的索引位置node不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果hash相等,key相等,直接返回该节点的value
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }//如果hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到节点。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //循环遍历链表,查询key和hash值相等的节点。
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

Get-Methode ist wie folgt:

1. Berechnen Sie den Hash-Wert direkt Wenn die gesuchten Knoten gleich sind, reicht es aus, den Knoten direkt zurückzugeben.

2. Wenn die Tabelle erweitert wird, rufen Sie die Suchmethode von ForwardingNode auf, um den Knoten zu finden.

3. Wenn keine Erweiterung vorhanden ist, durchlaufen Sie einfach die verknüpfte Liste und suchen Sie den Knotenwert mit demselben Schlüssel und Hashwert.

Erweiterung von ConcurrentHashMap

Die Erweiterung von ConcurrentHashMap ist im Vergleich zur Erweiterung von HashMap relativ kompliziert, da sie Multithread-Operationen umfasst. Die Erweiterungsmethode ist hier hauptsächlich die Übertragung Erfahren Sie, wie Sie diese Methode erweitern können.

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //保证每个线程扩容最少是16,
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
            //扩容2倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                //出现异常情况就不扩容了。
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //用新数组对象接收
            nextTable = nextTab;
            //初始化扩容下表为原数组的长度
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //扩容期间的过渡节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                //如果该线程已经完成了
                if (--i >= bound || finishing)
                    advance = false;
                //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }CAS操作设置区间
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//如果扩容完成了,进行收尾工作
                    nextTable = null;//清空临时数组
                    table = nextTab;//赋值原数组
                    sizeCtl = (n << 1) - (n >>> 1);//重新赋值sizeCtl
                    return;
                }//如果扩容还在进行,自己任务完成就进行sizeCtl-1,这里是因为,扩容是通过helpTransfer()和addCount()方法来调用的,在调用transfer()真正扩容之前,sizeCtl都会+1,所以这里每个线程完成后就进行-1。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //这里应该是判断扩容是否结束
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //结束,赋值状态
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }//如果在table中没找到,就用过渡节点
            else if ((f = tabAt(tab, i)) == null)
                //成功设置就进入下一个节点
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可
                advance = true; // already processed
            else {
            //这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //如果hash值大于0
                        if (fh >= 0) {
                        //为运算结果
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //这里的逻辑和hashMap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析HashMap的文章
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }//如果是树节点,执行树节点的扩容数据转移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //也是通过位运算判断两个链表的位置    
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //这里判断是否进行树转换
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                           //这里把新处理的链表赋值到新数组中
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

Die Erweiterung von ConcurrentHashMap ist immer noch relativ kompliziert. Die Komplexität spiegelt sich hauptsächlich im Grad der Steuerung der Multi-Thread-Erweiterung wider. Einerseits habe ich den Quellcode der Erweiterung nicht im Detail analysiert An manchen Stellen ist es für mich nicht ganz klar. Andererseits denke ich, dass es bei unserer Forschung hauptsächlich darum geht, die Schlüsselcodes und Schlüsselideen zu verstehen Wenn wir eine Reihe ähnlicher Funktionen nicht erneut implementieren, müssen wir uns meiner Meinung nach nicht um alle Details kümmern. Zusammenfassend lauten die Erweiterungsschritte von ConcurrentHashMap wie folgt:

1 Ermitteln Sie die Schrittgröße der Thread-Erweiterung, die mindestens 16 beträgt. Dies ist die Anzahl der Knoten, die ein einzelner Thread für die Erweiterung benötigt.

2. Erstellen Sie ein Array mit der doppelten ursprünglichen Kapazität und erstellen Sie einen Übergangsknoten für Abfragevorgänge während der Kapazitätserweiterung.

3. Führen Sie eine Endlosschleife zum Übertragen von Knoten durch, hauptsächlich basierend auf der Abschlussvariablen, um festzustellen, ob die Erweiterung abgeschlossen ist. Während des Erweiterungszeitraums wird der Erweiterungsvorgang durch Festlegen verschiedener Indizes in der folgenden Tabelle durchgeführt Threads, das heißt, verschiedene Threads, das Array von Operationen Die Segmente sind unterschiedlich, und synchronisierte Synchronisationssperren werden zum Sperren der Betriebsknoten verwendet, um die Thread-Sicherheit zu gewährleisten.

4. Die tatsächliche Position des Knotens im neuen Array entspricht der Erweiterungslogik von HashMap. Durch Bitoperationen wird berechnet, ob sich die neue verknüpfte Liste an der ursprünglichen Position oder an der ursprünglichen Position befindet + Die Länge der Erweiterung finden Sie in diesem Artikel.

3. Zusammenfassung

1. Der größte Teil des Logikcodes von ConcurrentHashMap verwendet hauptsächlich synchronisierte Summen, um die Thread-Sicherheit beim Einfügen und Erweitern von Knoten zu gewährleisten Fragen Sie hier, warum? Warum synchronisiert verwenden? Wie wäre es mit der Verwendung einer optimistischen Sperre, anstatt eine Sperre zu verwenden? Ich persönlich denke, dass es zwei mögliche Gründe gibt:

a. Optimistisches Sperren eignet sich besser für Szenarien mit weniger Konkurrenzkonflikten, was zu ständigen Wiederholungsversuchen führt, was zu einer geringeren Leistung führt.

Nach der Optimierung unterscheidet sich die Leistung von b.synchronized nicht von der von lock und kann in einigen Szenarien schneller sein als die von lock. Ich denke, das ist der Grund für die Verwendung von synchronisiert für die Synchronisierung.

2. Die Kernerweiterungslogik von ConcurrentHashMap besteht darin, verschiedenen Threads unterschiedliche Array-Indizes zuzuweisen, und dann verarbeitet jeder Thread die Knoten in seinen jeweiligen Bereichen in der Tabelle unten. Gleichzeitig verwendet der Verarbeitungsknoten die Logik von hashMap wieder. Durch Bitoperationen können Sie die Position des Knotens nach der Erweiterung entweder an der ursprünglichen Position oder an der ursprünglichen Position + der alten Längenposition ermitteln und den Wert schließlich direkt zuweisen .

Das obige ist der detaillierte Inhalt vonWarum ist ConcurrentHashmap Thread-sicher?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn