Maison  >  Article  >  Java  >  Comment utiliser la fonction atomicité en Java ?

Comment utiliser la fonction atomicité en Java ?

王林
王林avant
2023-05-09 16:40:17925parcourir

Thread Safety

Lorsque plusieurs threads accèdent à une classe, quelle que soit la méthode de planification utilisée par l'environnement d'exécution ou la façon dont ces processus alterneront l'exécution, et sans aucune synchronisation ou coordination supplémentaire dans le code appelant, cette classe sera si elle présente des résultats corrects. comportement, la classe est dite thread-safe.

La sécurité des threads se reflète principalement dans les trois aspects suivants

  • Atomicité : Fournit un accès mutuellement exclusif, et un seul thread peut opérer dessus en même temps

  • Visibilité : Un thread est responsable de les principales modifications de la mémoire peuvent être observées par d'autres threads en temps opportun

  • Ordre : Un thread observe l'ordre d'exécution des instructions dans d'autres threads. En raison de l'existence d'une réorganisation des instructions, les résultats d'observation sont généralement désordonnés.

Explication détaillée du package Atomic dans JUC

Le package Atomic fournit de nombreuses classes Atomicxxx :

Comment utiliser la fonction atomicité en Java ?

Ils sont tous des CAS (compareAndSwap) pour atteindre l'atomicité.

Écrivez d'abord un exemple simple comme suit :

@Slf4j
public class AtomicExample1 { 
    // 请求总数
    public static int clientTotal = 5000; 
    // 同时并发执行的线程数
    public static int threadTotal = 200; 
    public static AtomicInteger count = new AtomicInteger(0); 
    public static void main(String[] args) throws Exception {

        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }
 
    private static void add() {
        count.incrementAndGet();
    }
}

Vous pouvez envoyer que le résultat de chaque opération est toujours le résultat attendu 5000 que nous voulons. Expliquez que cette méthode de comptage est thread-safe.

Jetons un coup d'œil à la méthode count.incrementAndGet(). Son premier paramètre est l'objet lui-même, et le deuxième paramètre est valueOffset, qui est utilisé pour enregistrer l'adresse compilée de la valeur elle-même en mémoire. mise à jour L'opération trouve l'emplacement de la valeur dans la mémoire pour faciliter la comparaison. Le troisième paramètre est la constante 1

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
 
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
 
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
 
    private volatile int value; 
 
    ... 此处省略多个方法...
 
    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

Le code source d'AtomicInteger utilise une classe Unsafe, qui fournit une méthode getAndAddInt. :

public final class Unsafe {
    private static final Unsafe theUnsafe;
 
    ....此处省略很多方法及成员变量.... 
 
 public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); 
        return var5;
    } 
 
 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); 
 public native int getIntVolatile(Object var1, long var2);
}

Vous pouvez voir qu'une instruction do while est utilisée ici pour l'implémentation principale. Le cœur de l’instruction while consiste à appeler une méthode compareAndSwapInt(). Il s'agit d'une méthode native, qui est une méthode de bas niveau et n'est pas implémentée en Java.

Supposons que nous voulions effectuer une opération de 0+1=0. Voici les valeurs de chaque paramètre dans un seul thread :

Comment utiliser la fonction atomicité en Java ?

Comment utiliser la fonction atomicité en Java ?

Mise à jour :

Comment utiliser la fonction atomicité en Java ?

. Méthode compareAndSwapInt() Le premier paramètre (var1) est l'objet actuel, qui est count dans l'exemple de code. A ce moment, sa valeur est 0 (valeur attendue). La deuxième valeur (var2) est la valeur valueOffset transmise, qui a la valeur 12. Le troisième paramètre (var4) est la constante 1. Le paramètre variable (var5) dans la méthode est la valeur obtenue en appelant la méthode getIntVolatile sous-jacente basée sur le paramètre un et le paramètre deux valueOffset. À ce stade, sa valeur est 0. L'objectif que compareAndSwapInt() veut atteindre est que pour l'objet count, si la valeur dans la valeur attendue actuelle var1 est la même que la valeur renvoyée sous-jacente (var5), alors mettez-la à jour avec la valeur var5+var4. Si différent, recyclez pour obtenir la valeur attendue (var5) jusqu'à ce que la valeur actuelle soit la même que la valeur attendue, puis mettez à jour. Le cœur de la méthode compareAndSwap est ce que nous appelons habituellement CAS.

Les principes d'implémentation des autres classes du package Atomic, telles que AtomicLong, sont fondamentalement les mêmes que ci-dessus.

Nous introduisons ici la classe LongAdder. Grâce à l'analyse ci-dessus, nous savons déjà qu'AtomicLong utilise CAS : il essaie en permanence de modifier la valeur cible dans une boucle infinie jusqu'à ce que la modification réussisse. Si la concurrence n’est pas féroce, la probabilité d’une modification réussie est très élevée. D’un autre côté, si la probabilité d’échec d’une modification est élevée dans une situation hautement compétitive, plusieurs tentatives de boucle seront effectuées, ce qui affectera les performances.

Pour les variables longues et doubles ordinaires, jvm permet de diviser une opération de lecture ou d'écriture de 64 bits en deux opérations de 32 bits. L'idée principale de LongAdder est de séparer les données de point d'accès. Il peut séparer la valeur des données de base internes d'AtomicLong dans un tableau et la mapper à l'un des nombres pour compter via le hachage et d'autres algorithmes lorsque chaque thread y accède. Le résultat final du comptage est la sommation et l'accumulation de ce tableau. Parmi eux, la valeur des données du point chaud sera séparée en plusieurs cellules. Chaque cellule conserve sa valeur interne indépendamment. La valeur réelle de l'objet actuel est accumulée et synthétisée par toutes les cellules. . De cette manière, les points chauds sont efficacement séparés et le degré de parallélisme est amélioré. LongAdder équivaut à disperser la pression de mise à jour en un seul point sur chaque nœud sur la base des mises à jour AtomicLong Direct vers la base en cas de faible concurrence peut garantir que les performances sont fondamentalement cohérentes avec Atomic. En période de forte concurrence, les performances sont améliorées grâce à la décentralisation. Cependant, s'il y a des mises à jour simultanées pendant les statistiques, cela peut entraîner des erreurs dans les données statistiques.

Lorsque le nombre réel de concurrences est élevé, LongAdder peut être utilisé en premier. Lorsque le parallélisme est faible ou que des valeurs précises sont requises, AtomicLong peut être utilisé en premier, ce qui est plus efficace.

Ce qui suit est une démonstration simple de l'utilisation simple d'AtomicReference sous le package Atomic :

@Slf4j
public class AtomicExample4 { 
    private static AtomicReference<Integer> count = new AtomicReference<>(0); 
    public static void main(String[] args) {
        count.compareAndSet(0, 2); 
        count.compareAndSet(0, 1); 
        log.info("count:{}", count.get());
    }
}

compareAndSet()分别传入的是预期值跟更新值,只有当预期值跟当前值相等时,才会将值更新为更新值;

上面的第一个方法可以将值更新为2,而第二个步中无法将值更新为1。

下面简单介绍下AtomicIntegerFieldUpdater 用法(利用原子性去更新某个类的实例):

@Slf4j
public class AtomicExample5 { 
    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
 
    @Getter
    private volatile int count = 100; 
    public static void main(String[] args) { 
        AtomicExample5 example5 = new AtomicExample5();
 
        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1, {}", example5.getCount());
        }
 
        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 2, {}", example5.getCount());
        } else {
            log.info("update failed, {}", example5.getCount());
        }
    }
}

它可以更新某个类中指定成员变量的值。

注意:修改的成员变量需要用volatile关键字来修饰,并且不能是static描述的字段。

AtomicStampReference这个类它的核心是要解决CAS的ABA问题(CAS操作的时候,其他线程将变量的值A改成了B,接着又改回了A,等线程使用期望值A与当前变量进行比较的时候,发现A变量没有变,于是CAS就将A值进行了交换操作。

实际上该值已经被其他线程改变过)。

ABA问题的解决思路就是每次变量变更的时候,就将版本号加一。

看一下它的一个核心方法compareAndSet():

public class AtomicStampedReference<V> { 
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
 
   ... 此处省略多个方法 ....
 
   public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
}

可以看到它多了一个stamp的比较,stamp的值是由每次更新的时候进行维护的。

再介绍下AtomicLongArray,它维护了一个数组。在该数组下,我们可以选择性的已原子性操作更新某个索引对应的值。

public class AtomicLongArray implements java.io.Serializable {
    private static final long serialVersionUID = -2308431214976778248L;
 
    private static final Unsafe unsafe = Unsafe.getUnsafe();
 
    ...此处省略....
 
 
    /**
     * Atomically sets the element at position {@code i} to the given value
     * and returns the old value.
     *
     * @param i the index
     * @param newValue the new value
     * @return the previous value
     */
    public final long getAndSet(int i, long newValue) {
        return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue);
    }
 
    /**
     * Atomically sets the element at position {@code i} to the given
     * updated value if the current value {@code ==} the expected value.
     *
     * @param i the index
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int i, long expect, long update) {
        return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }
}

最后再写一个AtomcBoolean的简单使用:

@Slf4j
public class AtomicExample6 { 
    private static AtomicBoolean isHappened = new AtomicBoolean(false);
 
    // 请求总数
    public static int clientTotal = 5000;
 
    // 同时并发执行的线程数
    public static int threadTotal = 200;
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }
 
    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            log.info("execute");
        }
    }
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer