Heim  >  Artikel  >  Java  >  Wie ist der Ausführungsstatus von LongAdder in der gleichzeitigen Java-Programmierung?

Wie ist der Ausführungsstatus von LongAdder in der gleichzeitigen Java-Programmierung?

WBOY
WBOYnach vorne
2023-05-09 12:52:15718Durchsuche

longAccumulate-Methode

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

Der Code ist lang. Lassen Sie uns ihn in Abschnitten analysieren. Stellen Sie zunächst den Inhalt jedes Teils vor.

  • Teil 1: for Der Code vor der Schleife, hauptsächlich zum Erhalten des Hashs Wert des Threads, wenn er 0 ist, erzwinge die Initialisierungfor循环之前的代码,主要是获取线程的hash值,如果是0的话就强制初始化

  • 第二部分:for循环中第一个if语句,在Cell数组中进行累加、扩容

  • 第三部分:for循环中第一个else if语句,这部分的作用是创建Cell数组并初始化

  • 第四部分:for循环中第二个else if语句,当Cell数组竞争激烈时尝试在base上进行累加

线程hash值

int h; 
if ((h = getProbe()) == 0) { 
    ThreadLocalRandom.current(); // force initialization 
    h = getProbe(); 
    wasUncontended = true;   // true表示没有竞争
} 
boolean collide = false; // True if last slot nonempty 可以理解为是否需要扩容

这部分的核心代码是getProbe方法,这个方法的作用就是获取线程的hash值,方便后面通过位运算定位到Cell数组中某个位置,如果是0的话就会进行强制初始化

初始化Cell数组

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 省略...
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // 省略...
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {  // 获取锁
            boolean init = false;  // 初始化标志
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];  // 创建Cell数组
                    rs[h & 1] = new Cell(x);  // 索引1位置创建Cell元素,值为x=1
                    cells = rs;   // cells指向新数组
                    init = true;  // 初始化完成
                }
            } finally {
                cellsBusy = 0;  // 释放锁
            }
            if (init)
                break;  // 跳出循环
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

第一种情况下Cell数组为null,所以会进入第一个else if语句,并且没有其他线程进行操作,所以cellsBusy==0cells==as也是truecasCellsBusy()尝试对cellsBusy进行cas操作改成1也是true

首先创建了一个有两个元素的Cell数组,然后通过线程h & 1 的位运算在索引1的位置设置一个value1Cell,然后重新赋值给cells,标记初始化成功,修改cellsBusy0表示释放锁,最后跳出循环,初始化操作就完成了。

对base进行累加

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 省略...
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // 省略...
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 省略...
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

第二个else if语句的意思是当Cell数组中所有位置竞争都很激烈时,就尝试在base上进行累加,可以理解为最后的保障

Cell数组初始化之后

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 省略...
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {  // as初始化之后满足条件
            if ((a = as[(n - 1) & h]) == null) {  // as中某个位置的值为null
                if (cellsBusy == 0) {       // Try to attach new Cell 是否加锁
                    Cell r = new Cell(x);   // Optimistically create 创建新Cell
                    if (cellsBusy == 0 && casCellsBusy()) { // 双重检查是否有锁,并尝试加锁
                        boolean created = false;  // 
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {  // 重新检查该位置是否为null
                                rs[j] = r;  // 该位置添加Cell元素
                                created = true;  // 新Cell创建成功
                            }
                        } finally {
                            cellsBusy = 0;  // 释放锁
                        }
                        if (created)
                            break;  // 创建成功,跳出循环
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;  // 扩容标志
            }
            else if (!wasUncontended)       // 上面定位到的索引位置的值不为null
                wasUncontended = true;      // 重新计算hash,重新定位其他索引位置重试
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))  // 尝试在该索引位置进行累加
                break;
            else if (n >= NCPU || cells != as)  // 如果数组长度大于等于CPU核心数,就不能在扩容
                collide = false;            // At max size or stale
            else if (!collide)  // 数组长度没有达到最大值,修改扩容标志可以扩容
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];  // 创建一个原来长度2倍的数组
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];  // 把原来的元素拷贝到新数组中
                        cells = rs;  // cells指向新数组
                    }
                } finally {
                    cellsBusy = 0;  // 释放锁
                }
                collide = false;  // 已经扩容完成,修改标志不用再扩容
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);  // 重新获取hash值
        }
        // 省略...
}

根据代码中的注释分析一遍整体逻辑

  • 首先如果找到数组某个位置上的值为null,说明可以在这个位置进行操作,就创建一个新的Cell并初始化值为1放到这个位置,如果失败了就重新计算hash值再重试

  • 定位到的位置已经有值了,说明线程之间产生了竞争,如果wasUncontendedfalse就修改为true,并重新计算hash重试

  • 定位的位置有值并且wasUncontended已经是true,就尝试在该位置进行累加

  • 当累加失败时,判断数组容量是否已经达到最大,如果是就不能进行扩容,只能rehash并重试

  • 如果前面条件都不满足,并且扩容标志collide标记为false的话就修改为true,表示可以进行扩容,然后rehash重试

  • 首先尝试加锁,成功了就进行扩容操作,每次扩容长度是之前的2

Teil 2: Die erste if-Anweisung in der for-Schleife in der Cell code>-Array Akkumulieren und erweitern 🎜🎜🎜🎜Teil 3: Die erste <code>else if-Anweisung in der for-Schleife. Die Funktion dieses Teils besteht darin, eine Zelle zu erstellen Array und initialisieren 🎜🎜🎜🎜Teil 4: Die zweite else if-Anweisung in der for-Schleife, wenn der Wettbewerb um die Zelleerfolgt > Array ist heftig, versuchen Sie, auf base zu akkumulieren🎜🎜🎜🎜Thread-Hash-Wert🎜rrreee🎜Der Kerncode dieses Teils ist die Methode getProbe Um den hash-Wert des Threads zu erhalten, ist es praktisch, später eine bestimmte Position im <code>Cell-Array durch Bitoperationen zu lokalisieren , wird eine erzwungene Initialisierung durchgeführt. code>-Anweisung wird eingegeben und kein anderer Thread wird ausgeführt, daher ist cellsBusy==0, cells==as auch true, casCellsBusy() versucht, den cellsBusy code>Vorgang cas auszuführen und ihn in 1 zu ändern, was ebenfalls true ist . 🎜🎜Erstellen Sie zunächst ein Cell-Array mit zwei Elementen und setzen Sie es dann durch die Bitoperation von Thread h & 1an die Position des Index 1 > Eine Zelle, deren Wert 1 ist und die dann Zellen neu zugewiesen wird, markiert die Initialisierung als erfolgreich und ändert cellsBusy ist 0, um die Sperre aufzuheben, schließlich aus der Schleife zu springen und den Initialisierungsvorgang abzuschließen. 🎜🎜Basis akkumulieren🎜rrreee🎜Die zweite else if-Anweisung bedeutet, dass, wenn der Wettbewerb um alle Positionen im Cell-Array hart ist, eine Akkumulation auf Basis versucht werden kann als endgültige Garantie verstanden werden🎜🎜Nachdem das Cell-Array initialisiert wurde🎜rrreee🎜analysieren Sie die Gesamtlogik gemäß den Kommentaren im Code🎜🎜🎜🎜Zunächst einmal, ob der Wert an einer bestimmten Position im Array gefunden wird null gibt an, dass der Vorgang an dieser Stelle ausgeführt werden kann. Erstellen Sie eine neue Zelle, initialisieren Sie den Wert auf 1 und platzieren Sie ihn an dieser Stelle . Wenn dies fehlschlägt, versuchen Sie es erneut. 🎜🎜🎜🎜Die positionierte Position weist bereits auf einen Wettbewerb zwischen Threads hin > ist false code>, wird in <code>true geändert, berechnet den Hash neu und versucht es erneut 🎜🎜🎜🎜Die positionierte Position hat einen Wert und wasUncontended ist bereits wahr, versuchen Sie, an dieser Position zu akkumulieren🎜🎜🎜🎜Wenn die Akkumulation fehlschlägt, stellen Sie fest, ob die Array-Kapazität das Maximum erreicht hat. Wenn ja, kann sie nicht erweitert werden rehash und versuchen Sie es erneut🎜 🎜🎜🎜Wenn keine der vorherigen Bedingungen erfüllt ist und das Erweiterungsflag collide als false markiert ist, ändern Sie es auf true, was darauf hinweist, dass die Erweiterung durchgeführt werden kann, und dann rehashRetry🎜🎜🎜🎜Versuchen Sie zunächst, die Erweiterung zu sperren ist das 2-fache des vorherigen, und dann wird der ursprüngliche Array-Inhalt in das neue Array kopiert, und die Erweiterung ist abgeschlossen. 🎜🎜🎜

Das obige ist der detaillierte Inhalt vonWie ist der Ausführungsstatus von LongAdder in der gleichzeitigen Java-Programmierung?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen