Home >Java >javaTutorial >How to use atomicity function in Java?
When multiple threads access a class, it does not matter what scheduling method the runtime environment uses or how these processes will alternate execution, and no additional synchronization is required in the calling code Or coordination, this class can show correct behavior, then this class is said to be thread-safe.
Atomicity: Provides mutually exclusive access, and only one thread can access it at the same time It operates
Visibility: Modifications to main memory by one thread can be observed by other threads in a timely manner
Orderliness:A thread observes the order of instruction execution in other threads. Due to the existence of instruction reordering, the observation results are generally messy and disordered
The Atomic package provides many Atomicxxx classes:
They are all implemented by CAS (compareAndSwap) Atomic.
First write a simple example as follows:
@Slf4j public class AtomicExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); } }
You can send that the result of each operation is always the expected result of 5000 we want. Explain that this counting method is thread-safe.
Let’s take a look at the count.incrementAndGet() method. Its first parameter is the object itself, and the second parameter is valueOffset, which is used to record the compiled address of the value itself in the memory. This record is also mainly It is to find the location of the value in the memory during the update operation to facilitate comparison. The third parameter is the constant 1
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; ... 此处省略多个方法... /** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } }
AtomicInteger uses an Unsafe class in the source code, which provides a getAndAddInt method. Let’s continue. Click to view its source code:
public final class Unsafe { private static final Unsafe theUnsafe; ....此处省略很多方法及成员变量.... public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public native int getIntVolatile(Object var1, long var2); }
You can see that a do while statement is used for the main implementation. The core of the while statement is to call a compareAndSwapInt() method. It is a native method, which is a low-level method and is not implemented in Java.
Assume that we want to perform an operation of 0 1=0. The following are the values of each parameter in a single thread:
Updated:
The first parameter (var1) of the compareAndSwapInt() method is the current object, which is count in the code example. At this time its value is 0 (expected value). The second value (var2) is the passed valueOffset value, which has a value of 12. The third parameter (var4) is the constant 1. The variable parameter (var5) in the method is the value obtained by calling the underlying getIntVolatile method based on parameter one and parameter two valueOffset. At this time, its value is 0. The goal that compareAndSwapInt() wants to achieve is that for the count object, if the value in the current expected value var1 is the same as the underlying returned value (var5), then update it to the value var5 var4. If different, recycle to get the expected value (var5) until the current value is the same as the expected value and then update. The core of the compareAndSwap method is what we usually call CAS.
The implementation principles of other classes under the Atomic package, such as AtomicLong, are basically the same as above.
Here we introduce the LongAdder class. Through the above analysis, we already know that AtomicLong uses CAS: it continuously tries to modify the target value in an infinite loop until the modification is successful. If the competition is not fierce, the probability of successful modification is very high. On the other hand, if the probability of modification failure is high in a highly competitive situation, it will make multiple loop attempts, so performance will be affected.
For common types of long and double variables, jvm allows a 64-bit read operation or write operation to be split into two 32-bit operations. The core idea of LongAdder is to separate hotspot data. It can separate the AtomicLong internal core data value into an array, and map it to one of the numbers for counting through hash and other algorithms when each thread accesses it. The final counting result is the summation and accumulation of this array. Among them, the hotspot data value will be separated into multiple cells. Each cell maintains its internal value independently. The actual value of the current object is accumulated and synthesized by all cells. . In this way, hot spots are effectively separated and the degree of parallelism is improved. LongAdder is equivalent to dispersing the single-point update pressure to each node on the basis of AtomicLong. In times of low concurrency, direct updates to the base can ensure that the performance is basically consistent with Atomic. In times of high concurrency, performance is improved through decentralization. However, if there are concurrent updates during statistics, it may cause errors in the statistical data.
When the actual concurrency count is high, LongAdder can be used first. AtomicLong can be used first when parallelism is low or accurate values are required, which is more efficient.
The following is a simple demonstration of the simple usage of AtomicReference under the Atomic package:
@Slf4j public class AtomicExample4 { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0, 2); count.compareAndSet(0, 1); log.info("count:{}", count.get()); } }
compareAndSet()分别传入的是预期值跟更新值,只有当预期值跟当前值相等时,才会将值更新为更新值;
上面的第一个方法可以将值更新为2,而第二个步中无法将值更新为1。
下面简单介绍下AtomicIntegerFieldUpdater 用法(利用原子性去更新某个类的实例):
@Slf4j public class AtomicExample5 { private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count"); @Getter private volatile int count = 100; public static void main(String[] args) { AtomicExample5 example5 = new AtomicExample5(); if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 1, {}", example5.getCount()); } if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 2, {}", example5.getCount()); } else { log.info("update failed, {}", example5.getCount()); } } }
它可以更新某个类中指定成员变量的值。
注意:修改的成员变量需要用volatile关键字来修饰,并且不能是static描述的字段。
AtomicStampReference这个类它的核心是要解决CAS的ABA问题(CAS操作的时候,其他线程将变量的值A改成了B,接着又改回了A,等线程使用期望值A与当前变量进行比较的时候,发现A变量没有变,于是CAS就将A值进行了交换操作。
实际上该值已经被其他线程改变过)。
ABA问题的解决思路就是每次变量变更的时候,就将版本号加一。
看一下它的一个核心方法compareAndSet():
public class AtomicStampedReference<V> { private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } ... 此处省略多个方法 .... public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); } }
可以看到它多了一个stamp的比较,stamp的值是由每次更新的时候进行维护的。
再介绍下AtomicLongArray,它维护了一个数组。在该数组下,我们可以选择性的已原子性操作更新某个索引对应的值。
public class AtomicLongArray implements java.io.Serializable { private static final long serialVersionUID = -2308431214976778248L; private static final Unsafe unsafe = Unsafe.getUnsafe(); ...此处省略.... /** * Atomically sets the element at position {@code i} to the given value * and returns the old value. * * @param i the index * @param newValue the new value * @return the previous value */ public final long getAndSet(int i, long newValue) { return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue); } /** * Atomically sets the element at position {@code i} to the given * updated value if the current value {@code ==} the expected value. * * @param i the index * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int i, long expect, long update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); } }
最后再写一个AtomcBoolean的简单使用:
@Slf4j public class AtomicExample6 { private static AtomicBoolean isHappened = new AtomicBoolean(false); // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); test(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappened:{}", isHappened.get()); } private static void test() { if (isHappened.compareAndSet(false, true)) { log.info("execute"); } } }
The above is the detailed content of How to use atomicity function in Java?. For more information, please follow other related articles on the PHP Chinese website!