Maison  >  Article  >  Java  >  Quel est l’état d’exécution de LongAdder dans la programmation simultanée Java ?

Quel est l’état d’exécution de LongAdder dans la programmation simultanée Java ?

WBOY
WBOYavant
2023-05-09 12:52:15718parcourir

Méthode longAccumulate

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

Le code est long, analysons-le par sections, introduisons d'abord le contenu de chaque partie

  • La première partie : pourLe code avant la boucle, principalement pour obtenir le valeur de hachage du thread, si elle est 0, forcer l'initialisationfor循环之前的代码,主要是获取线程的hash值,如果是0的话就强制初始化

  • 第二部分:for循环中第一个if语句,在Cell数组中进行累加、扩容

  • 第三部分:for循环中第一个else if语句,这部分的作用是创建Cell数组并初始化

  • 第四部分:for循环中第二个else if语句,当Cell数组竞争激烈时尝试在base上进行累加

线程hash值

int h; 
if ((h = getProbe()) == 0) { 
    ThreadLocalRandom.current(); // force initialization 
    h = getProbe(); 
    wasUncontended = true;   // true表示没有竞争
} 
boolean collide = false; // True if last slot nonempty 可以理解为是否需要扩容

这部分的核心代码是getProbe方法,这个方法的作用就是获取线程的hash值,方便后面通过位运算定位到Cell数组中某个位置,如果是0的话就会进行强制初始化

初始化Cell数组

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 省略...
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // 省略...
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {  // 获取锁
            boolean init = false;  // 初始化标志
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];  // 创建Cell数组
                    rs[h & 1] = new Cell(x);  // 索引1位置创建Cell元素,值为x=1
                    cells = rs;   // cells指向新数组
                    init = true;  // 初始化完成
                }
            } finally {
                cellsBusy = 0;  // 释放锁
            }
            if (init)
                break;  // 跳出循环
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

第一种情况下Cell数组为null,所以会进入第一个else if语句,并且没有其他线程进行操作,所以cellsBusy==0cells==as也是truecasCellsBusy()尝试对cellsBusy进行cas操作改成1也是true

首先创建了一个有两个元素的Cell数组,然后通过线程h & 1 的位运算在索引1的位置设置一个value1Cell,然后重新赋值给cells,标记初始化成功,修改cellsBusy0表示释放锁,最后跳出循环,初始化操作就完成了。

对base进行累加

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 省略...
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // 省略...
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 省略...
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

第二个else if语句的意思是当Cell数组中所有位置竞争都很激烈时,就尝试在base上进行累加,可以理解为最后的保障

Cell数组初始化之后

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 省略...
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {  // as初始化之后满足条件
            if ((a = as[(n - 1) & h]) == null) {  // as中某个位置的值为null
                if (cellsBusy == 0) {       // Try to attach new Cell 是否加锁
                    Cell r = new Cell(x);   // Optimistically create 创建新Cell
                    if (cellsBusy == 0 && casCellsBusy()) { // 双重检查是否有锁,并尝试加锁
                        boolean created = false;  // 
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {  // 重新检查该位置是否为null
                                rs[j] = r;  // 该位置添加Cell元素
                                created = true;  // 新Cell创建成功
                            }
                        } finally {
                            cellsBusy = 0;  // 释放锁
                        }
                        if (created)
                            break;  // 创建成功,跳出循环
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;  // 扩容标志
            }
            else if (!wasUncontended)       // 上面定位到的索引位置的值不为null
                wasUncontended = true;      // 重新计算hash,重新定位其他索引位置重试
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))  // 尝试在该索引位置进行累加
                break;
            else if (n >= NCPU || cells != as)  // 如果数组长度大于等于CPU核心数,就不能在扩容
                collide = false;            // At max size or stale
            else if (!collide)  // 数组长度没有达到最大值,修改扩容标志可以扩容
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];  // 创建一个原来长度2倍的数组
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];  // 把原来的元素拷贝到新数组中
                        cells = rs;  // cells指向新数组
                    }
                } finally {
                    cellsBusy = 0;  // 释放锁
                }
                collide = false;  // 已经扩容完成,修改标志不用再扩容
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);  // 重新获取hash值
        }
        // 省略...
}

根据代码中的注释分析一遍整体逻辑

  • 首先如果找到数组某个位置上的值为null,说明可以在这个位置进行操作,就创建一个新的Cell并初始化值为1放到这个位置,如果失败了就重新计算hash值再重试

  • 定位到的位置已经有值了,说明线程之间产生了竞争,如果wasUncontendedfalse就修改为true,并重新计算hash重试

  • 定位的位置有值并且wasUncontended已经是true,就尝试在该位置进行累加

  • 当累加失败时,判断数组容量是否已经达到最大,如果是就不能进行扩容,只能rehash并重试

  • 如果前面条件都不满足,并且扩容标志collide标记为false的话就修改为true,表示可以进行扩容,然后rehash重试

  • 首先尝试加锁,成功了就进行扩容操作,每次扩容长度是之前的2

Partie 2 : La première instruction if dans la boucle for, dans la Cell Accumuler et développer 🎜🎜🎜🎜Partie 3 : La première instruction <code>else if dans la boucle for La fonction de cette partie est de créer un . Cell Arrayez et initialisez 🎜🎜🎜🎜Partie 4 : La deuxième instruction else if dans la boucle for, lorsque la compétition pour la Cell code> est féroce, essayez d'accumuler sur <code>base🎜🎜🎜🎜Valeur de hachage du fil🎜rrreee🎜Le code principal de cette partie est la méthode getProbe. La méthode consiste à obtenir la valeur hash du thread, il est pratique de localiser une certaine position dans le tableau Cell via des opérations sur bits plus tard si c'est 0. >, une initialisation forcée sera effectuée🎜🎜Initialisez le tableau Cell🎜rrreee🎜 Dans le premier cas, le tableau Cell est null, donc le premier else if sera entrée et aucun autre thread ne fonctionnera, donc <code>cellsBusy==0, cells==as est également true, casCellsBusy() essaie de cellsBusy code>Effectuer l'opération cas et de la remplacer par 1 qui est également vrai. 🎜🎜Créez d'abord un tableau Cell avec deux éléments, puis définissez-le à la position de l'index 1 via l'opération binaire du thread h & 1 Une Cellule dont la valeur est 1, puis réaffectée aux cellules, marque l'initialisation avec succès et modifie le cellsBusy est 0 pour libérer le verrou, enfin sortir de la boucle et l'opération d'initialisation est terminée. 🎜🎜Accumulate base🎜rrreee🎜La deuxième instruction else if signifie que lorsque la concurrence pour toutes les positions dans le tableau Cell est féroce, essayez d'accumuler sur la base être compris comme la garantie finale🎜🎜Une fois le tableau de cellules initialisé🎜rrreee🎜analyser la logique globale en fonction des commentaires dans le code🎜🎜🎜🎜Tout d'abord, si la valeur à une certaine position dans le tableau s'avère être null, indiquant que l'opération peut être effectuée à cet emplacement, créez une nouvelle Cell et initialisez la valeur à 1 et placez-la à cet emplacement . Si cela échoue, réessayez. Calculez la valeur du hash et réessayez 🎜🎜🎜🎜La position positionnée a déjà une valeur, indiquant qu'il y a une concurrence entre les threads. > est false code> est remplacé par <code>true, recalcule le hash et réessaye 🎜🎜🎜🎜La position positionnée a une valeur et wasUncontended est déjà vrai, essayez d'accumuler à cette position🎜🎜🎜🎜Lorsque l'accumulation échoue, déterminez si la capacité du tableau a atteint le maximum. Si c'est le cas, elle ne peut pas être étendue et peut seulement. répétez et réessayez🎜 🎜🎜🎜Si aucune des conditions précédentes n'est remplie et que l'indicateur d'expansion collide est marqué comme false, changez-le à true, indiquant que l'expansion peut être effectuée, puis rehashRéessayer🎜🎜🎜🎜Essayez d'abord de verrouiller en cas de succès, l'opération d'expansion est effectuée à chaque longueur. est 2 fois le précédent, puis le contenu du tableau d'origine est copié dans Nouveau tableau, l'expansion est terminée. 🎜🎜🎜

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer