首頁  >  文章  >  Java  >  怎麼使用Java中的Atomic原子性功能?

怎麼使用Java中的Atomic原子性功能?

王林
王林轉載
2023-05-09 16:40:17924瀏覽

線程安全性

當多個執行緒存取某個類別時,不管運行時環境採用何種調度方式或這些進程將如何交替執行,並且在主調碼中不需要任何額外的同步或協調,這個類別都能表現出正確的行為,那麼就稱這個類別時線程安全的。

線程安全主要體現在以下三個方面

  • 原子性:提供了互斥訪問,同一時刻只能有一個執行緒對它進行操作

  • 可見性:一個執行緒對主記憶體的修改可以及時的被其他執行緒觀察到

  • 有序性:一個執行緒觀察其他執行緒中的指令執行順序,由於指令重排序的存在,該觀察結果一般雜亂無序

JUC中的Atomic套件詳解

Atomic套件中提供了許多Atomicxxx的類別:

怎麼使用Java中的Atomic原子性功能?

##它們都是CAS(compareAndSwap)來實現原子性。

先寫一個簡單範例如下:

@Slf4j
public class AtomicExample1 { 
    // 请求总数
    public static int clientTotal = 5000; 
    // 同时并发执行的线程数
    public static int threadTotal = 200; 
    public static AtomicInteger count = new AtomicInteger(0); 
    public static void main(String[] args) throws Exception {

        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }
 
    private static void add() {
        count.incrementAndGet();
    }
}

可以發下每次的運行結果總是我們想要的預期結果5000。說明該計數方法是線程安全的。

我們查看下count.incrementAndGet()方法,它的第一個參數為物件本身,第二個參數為valueOffset是用來記錄value本身在記憶體的編譯位址的,這個記錄,也主要是為了在更新操作在記憶體中找到value的位置,方便比較,第三個參數為常數1

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
 
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
 
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
 
    private volatile int value; 
 
    ... 此处省略多个方法...
 
    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

AtomicInteger源碼裡使用了一個Unsafe的類別,它提供了一個getAndAddInt的方法,我們繼續點看查看它的原始碼:

public final class Unsafe {
    private static final Unsafe theUnsafe;
 
    ....此处省略很多方法及成员变量.... 
 
 public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); 
        return var5;
    } 
 
 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); 
 public native int getIntVolatile(Object var1, long var2);
}

可以看到這裡使用了一個do while語句來做主體實作的。而在while語句裡它的核心是呼叫了一個compareAndSwapInt()的方法。它是一個native方法,它是一個底層的方法,不是用Java來實現的。

假設我們要執行0 1=0的操作,以下是單執行緒情況下各參數的值:

怎麼使用Java中的Atomic原子性功能?

怎麼使用Java中的Atomic原子性功能?

更新後:

怎麼使用Java中的Atomic原子性功能?

#compareAndSwapInt()方法的第一個參數(var1)是目前的對象,就是程式碼範例中的count。此時它的值為0(期望值)。第二個值(var2)是傳遞的valueOffset值,它的值為12。第三個參數(var4)就為常數1。方法中的變數參數(var5)是根據參數一和參數二valueOffset,呼叫底層getIntVolatile方法得到的值,此時它的值為0 。 compareAndSwapInt()想要達到的目標是對於count這個對象,如果當前的期望值var1裡的value跟底層的返回的值(var5)相同的話,那麼把它更新成var5 var4這個值。不同的話重新循環取期望值(var5)直到當前值與期望值相同才做更新。 compareAndSwap方法的核心也就是我們通常所說的CAS。

Atomic套件下其他的類別如AtomicLong等的實作原理基本上與上述一樣。

這裡再介紹下LongAdder這個類,透過上述的分析,我們已經知道了AtomicLong使用CAS:在一個死循環內不斷嘗試修改目標值直到修改成功。如果在競爭不激烈的情況下,它修改成功機率很高。反之,如果在競爭激烈的情況下,修改失敗的機率會很高,它就會進行多次的循環嘗試,因此性能會受到一些影響。

對於普通類型的long和double變量,jvm允許將64位元的讀取操作或寫入操作拆成兩個32位元的操作。 LongAdder的核心思想是將熱點資料分離,它可以將AtomicLong內部核心資料value分離成數組,每個執行緒存取時透過hash等演算法映射到其中一個數字進行計數。而最終的計數結果則為這個數組的求和累加,其中熱點數據value,它會被分離成多個單元的cell,每個cell獨自維護內部的值,當前對象的實際值由所有的cell累計合成。這樣,熱點就進行了有效的分離,提高了並行度。 LongAdder相當於在AtomicLong的基礎上將單點的更新壓力分散到各個節點上,在低並發的時候對base的直接更新可以很好的保障跟Atomic的性能基本一致。而在高並發的時候,透過分散提高了性能。但是如果在統計的時候有並發更新,可能會導致統計的數據有誤差。

在實際高並發計數的時候,可以優先使用LongAdder。在低並行度或需要準確數值的時候可以優先使用AtomicLong,這樣反而效率更高。

下面簡單的示範下Atomic套件下AtomicReference簡單的用法:

@Slf4j
public class AtomicExample4 { 
    private static AtomicReference<Integer> count = new AtomicReference<>(0); 
    public static void main(String[] args) {
        count.compareAndSet(0, 2); 
        count.compareAndSet(0, 1); 
        log.info("count:{}", count.get());
    }
}

compareAndSet()分别传入的是预期值跟更新值,只有当预期值跟当前值相等时,才会将值更新为更新值;

上面的第一个方法可以将值更新为2,而第二个步中无法将值更新为1。

下面简单介绍下AtomicIntegerFieldUpdater 用法(利用原子性去更新某个类的实例):

@Slf4j
public class AtomicExample5 { 
    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
 
    @Getter
    private volatile int count = 100; 
    public static void main(String[] args) { 
        AtomicExample5 example5 = new AtomicExample5();
 
        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1, {}", example5.getCount());
        }
 
        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 2, {}", example5.getCount());
        } else {
            log.info("update failed, {}", example5.getCount());
        }
    }
}

它可以更新某个类中指定成员变量的值。

注意:修改的成员变量需要用volatile关键字来修饰,并且不能是static描述的字段。

AtomicStampReference这个类它的核心是要解决CAS的ABA问题(CAS操作的时候,其他线程将变量的值A改成了B,接着又改回了A,等线程使用期望值A与当前变量进行比较的时候,发现A变量没有变,于是CAS就将A值进行了交换操作。

实际上该值已经被其他线程改变过)。

ABA问题的解决思路就是每次变量变更的时候,就将版本号加一。

看一下它的一个核心方法compareAndSet():

public class AtomicStampedReference<V> { 
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
 
   ... 此处省略多个方法 ....
 
   public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }
}

可以看到它多了一个stamp的比较,stamp的值是由每次更新的时候进行维护的。

再介绍下AtomicLongArray,它维护了一个数组。在该数组下,我们可以选择性的已原子性操作更新某个索引对应的值。

public class AtomicLongArray implements java.io.Serializable {
    private static final long serialVersionUID = -2308431214976778248L;
 
    private static final Unsafe unsafe = Unsafe.getUnsafe();
 
    ...此处省略....
 
 
    /**
     * Atomically sets the element at position {@code i} to the given value
     * and returns the old value.
     *
     * @param i the index
     * @param newValue the new value
     * @return the previous value
     */
    public final long getAndSet(int i, long newValue) {
        return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue);
    }
 
    /**
     * Atomically sets the element at position {@code i} to the given
     * updated value if the current value {@code ==} the expected value.
     *
     * @param i the index
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int i, long expect, long update) {
        return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }
}

最后再写一个AtomcBoolean的简单使用:

@Slf4j
public class AtomicExample6 { 
    private static AtomicBoolean isHappened = new AtomicBoolean(false);
 
    // 请求总数
    public static int clientTotal = 5000;
 
    // 同时并发执行的线程数
    public static int threadTotal = 200;
 
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }
 
    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            log.info("execute");
        }
    }
}

以上是怎麼使用Java中的Atomic原子性功能?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除