Java의 여러 잠금: 동기화, ReentrantLock, ReentrantReadWriteLock은 기본적으로 프로그래밍 요구 사항을 충족할 수 있지만 세분성이 너무 큽니다. 동시에 하나의 스레드만 동기화된 블록에 들어갈 수 있으므로 특정 높은 동시성 시나리오에는 적합하지 않습니다. .
다음은 몇 가지 더 세분화된 잠금을 제공합니다.
1. 분할된 잠금
concurrentHashMap의 분할 아이디어를 활용하여 먼저 특정 개수의 잠금을 생성합니다. 그런 다음 키를 기반으로 해당 잠금을 반환합니다. 이는 여러 구현 중 가장 간단하고 성능이 뛰어나며 최종적으로 채택된 잠금 전략이기도 합니다. 코드는
/** * 分段锁,系统提供一定数量的原始锁,根据传入对象的哈希值获取对应的锁并加锁 * 注意:要锁的对象的哈希值如果发生改变,有可能导致锁无法成功释放!!! */ public class SegmentLock<T> { private Integer segments = 16;//默认分段数量 private final HashMap<Integer, ReentrantLock> lockMap = new HashMap<>(); public SegmentLock() { init(null, false); } public SegmentLock(Integer counts, boolean fair) { init(counts, fair); } private void init(Integer counts, boolean fair) { if (counts != null) { segments = counts; } for (int i = 0; i < segments; i++) { lockMap.put(i, new ReentrantLock(fair)); } } public void lock(T key) { ReentrantLock lock = lockMap.get(key.hashCode() % segments); lock.lock(); } public void unlock(T key) { ReentrantLock lock = lockMap.get(key.hashCode() % segments); lock.unlock(); } }
2. Hash lock
을 기반으로 개발되었습니다. 위의 분할된 잠금 중 두 번째 잠금 전략은 진정한 세분화된 잠금을 달성하는 것을 목표로 합니다. 서로 다른 해시 값을 가진 각 객체는 자체적인 독립적인 잠금을 얻을 수 있습니다. 테스트에서 잠긴 코드가 매우 빠르게 실행될 때 효율성은 세그먼트 잠금보다 약 30% 느렸습니다. 장기적인 운영이 있다면 성능이 더 좋아질 것 같은 느낌이 듭니다. 코드는 다음과 같습니다.
public class HashLock<T> { private boolean isFair = false; private final SegmentLock<T> segmentLock = new SegmentLock<>();//分段锁 private final ConcurrentHashMap<T, LockInfo> lockMap = new ConcurrentHashMap<>(); public HashLock() { } public HashLock(boolean fair) { isFair = fair; } public void lock(T key) { LockInfo lockInfo; segmentLock.lock(key); try { lockInfo = lockMap.get(key); if (lockInfo == null) { lockInfo = new LockInfo(isFair); lockMap.put(key, lockInfo); } else { lockInfo.count.incrementAndGet(); } } finally { segmentLock.unlock(key); } lockInfo.lock.lock(); } public void unlock(T key) { LockInfo lockInfo = lockMap.get(key); if (lockInfo.count.get() == 1) { segmentLock.lock(key); try { if (lockInfo.count.get() == 1) { lockMap.remove(key); } } finally { segmentLock.unlock(key); } } lockInfo.count.decrementAndGet(); lockInfo.unlock(); } private static class LockInfo { public ReentrantLock lock; public AtomicInteger count = new AtomicInteger(1); private LockInfo(boolean fair) { this.lock = new ReentrantLock(fair); } public void lock() { this.lock.lock(); } public void unlock() { this.lock.unlock(); } } }
3. 약한 참조 잠금
해시 잠금은 잠금 생성 및 소멸의 동기화를 보장하기 위해 도입되었기 때문에 항상 약간 결함이 있는 것처럼 느껴집니다. 그래서 더 나은 성능과 더 세분화된 잠금을 추구하기 위해 세 번째 잠금을 작성했습니다. 이 잠금의 아이디어는 Java의 약한 참조를 사용하여 잠금을 생성하고 추가 소비를 피하기 위해 잠금 파괴를 JVM의 가비지 수집에 넘겨주는 것입니다.
ConcurrentHashMap을 잠금 컨테이너로 사용하기 때문에 분할 잠금을 실제로 제거할 수 없다는 점은 아쉽습니다. 이 잠금의 성능은 HashLock보다 약 10% 빠릅니다. 잠금 코드:
/** * 弱引用锁,为每个独立的哈希值提供独立的锁功能 */ public class WeakHashLock<T> { private ConcurrentHashMap<T, WeakLockRef<T, ReentrantLock>> lockMap = new ConcurrentHashMap<>(); private ReferenceQueue<ReentrantLock> queue = new ReferenceQueue<>(); public ReentrantLock get(T key) { if (lockMap.size() > 1000) { clearEmptyRef(); } WeakReference<ReentrantLock> lockRef = lockMap.get(key); ReentrantLock lock = (lockRef == null ? null : lockRef.get()); while (lock == null) { lockMap.putIfAbsent(key, new WeakLockRef<>(new ReentrantLock(), queue, key)); lockRef = lockMap.get(key); lock = (lockRef == null ? null : lockRef.get()); if (lock != null) { return lock; } clearEmptyRef(); } return lock; } @SuppressWarnings("unchecked") private void clearEmptyRef() { Reference<? extends ReentrantLock> ref; while ((ref = queue.poll()) != null) { WeakLockRef<T, ? extends ReentrantLock> weakLockRef = (WeakLockRef<T, ? extends ReentrantLock>) ref; lockMap.remove(weakLockRef.key); } } private static final class WeakLockRef<T, K> extends WeakReference<K> { final T key; private WeakLockRef(K referent, ReferenceQueue<? super K> q, T key) { super(referent, q); this.key = key; } } }
4. KEY(기본 키) 기반의 Mutex 잠금
KeyLock은 해당 데이터의 KEY(기본 키)에 대해 수행됩니다. 잠금은 다른 키에서 작동하는 한 병렬로 처리될 수 있으므로 스레드의 병렬 처리가 크게 향상됩니다.
KeyLock에는 다음이 있습니다. 특징 :
1. 세밀하고 높은 병렬성
2. 재진입
3. 공정한 잠금
4. 잠금 오버헤드가 ReentrantLock보다 커서 처리 시간이 길고 키 범위가 긴 경우에 적합 큰 장면
public class KeyLock<K> { // 保存所有锁定的KEY及其信号量 private final ConcurrentMap<K, Semaphore> map = new ConcurrentHashMap<K, Semaphore>(); // 保存每个线程锁定的KEY及其锁定计数 private final ThreadLocal<Map<K, LockInfo>> local = new ThreadLocal<Map<K, LockInfo>>() { @Override protected Map<K, LockInfo> initialValue() { return new HashMap<K, LockInfo>(); } }; /** * 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(K)} * 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashCode()}和 * {@link #equals(Object)}方法 * * @param key */ public void lock(K key) { if (key == null) return; LockInfo info = local.get().get(key); if (info == null) { Semaphore current = new Semaphore(1); current.acquireUninterruptibly(); Semaphore previous = map.put(key, current); if (previous != null) previous.acquireUninterruptibly(); local.get().put(key, new LockInfo(current)); } else { info.lockCount++; } } /** * 释放key,唤醒其他等待此key的线程 * @param key */ public void unlock(K key) { if (key == null) return; LockInfo info = local.get().get(key); if (info != null && --info.lockCount == 0) { info.current.release(); map.remove(key, info.current); local.get().remove(key); } } /** * 锁定多个key * 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生 * @param keys */ public void lock(K[] keys) { if (keys == null) return; for (K key : keys) { lock(key); } } /** * 释放多个key * @param keys */ public void unlock(K[] keys) { if (keys == null) return; for (K key : keys) { unlock(key); } } private static class LockInfo { private final Semaphore current; private int lockCount; private LockInfo(Semaphore current) { this.current = current; this.lockCount = 1; } } }
KeyLock 사용 예:
private int[] accounts; private KeyLock<Integer> lock = new KeyLock<Integer>(); public boolean transfer(int from, int to, int money) { Integer[] keys = new Integer[] {from, to}; Arrays.sort(keys); //对多个key进行排序,保证锁定顺序防止死锁 lock.lock(keys); try { //处理不同的from和to的线程都可进入此同步块 if (accounts[from] < money) return false; accounts[from] -= money; accounts[to] += money; return true; } finally { lock.unlock(keys); } }
테스트 코드는 다음과 같습니다.
//场景:多线程并发转账 public class Test { private final int[] account; // 账户数组,其索引为账户ID,内容为金额 public Test(int count, int money) { account = new int[count]; Arrays.fill(account, money); } boolean transfer(int from, int to, int money) { if (account[from] < money) return false; account[from] -= money; try { Thread.sleep(2); } catch (Exception e) { } account[to] += money; return true; } int getAmount() { int result = 0; for (int m : account) result += m; return result; } public static void main(String[] args) throws Exception { int count = 100; //账户个数 int money = 10000; //账户初始金额 int threadNum = 8; //转账线程数 int number = 10000; //转账次数 int maxMoney = 1000; //随机转账最大金额 Test test = new Test(count, money); //不加锁 // Runner runner = test.new NonLockRunner(maxMoney, number); //加synchronized锁 // Runner runner = test.new SynchronizedRunner(maxMoney, number); //加ReentrantLock锁 // Runner runner = test.new ReentrantLockRunner(maxMoney, number); //加KeyLock锁 Runner runner = test.new KeyLockRunner(maxMoney, number); Thread[] threads = new Thread[threadNum]; for (int i = 0; i < threadNum; i++) threads[i] = new Thread(runner, "thread-" + i); long begin = System.currentTimeMillis(); for (Thread t : threads) t.start(); for (Thread t : threads) t.join(); long time = System.currentTimeMillis() - begin; System.out.println("类型:" + runner.getClass().getSimpleName()); System.out.printf("耗时:%dms\n", time); System.out.printf("初始总金额:%d\n", count * money); System.out.printf("终止总金额:%d\n", test.getAmount()); } // 转账任务 abstract class Runner implements Runnable { final int maxMoney; final int number; private final Random random = new Random(); private final AtomicInteger count = new AtomicInteger(); Runner(int maxMoney, int number) { this.maxMoney = maxMoney; this.number = number; } @Override public void run() { while(count.getAndIncrement() < number) { int from = random.nextInt(account.length); int to; while ((to = random.nextInt(account.length)) == from) ; int money = random.nextInt(maxMoney); doTransfer(from, to, money); } } abstract void doTransfer(int from, int to, int money); } // 不加锁的转账 class NonLockRunner extends Runner { NonLockRunner(int maxMoney, int number) { super(maxMoney, number); } @Override void doTransfer(int from, int to, int money) { transfer(from, to, money); } } // synchronized的转账 class SynchronizedRunner extends Runner { SynchronizedRunner(int maxMoney, int number) { super(maxMoney, number); } @Override synchronized void doTransfer(int from, int to, int money) { transfer(from, to, money); } } // ReentrantLock的转账 class ReentrantLockRunner extends Runner { private final ReentrantLock lock = new ReentrantLock(); ReentrantLockRunner(int maxMoney, int number) { super(maxMoney, number); } @Override void doTransfer(int from, int to, int money) { lock.lock(); try { transfer(from, to, money); } finally { lock.unlock(); } } } // KeyLock的转账 class KeyLockRunner extends Runner { private final KeyLock<Integer> lock = new KeyLock<Integer>(); KeyLockRunner(int maxMoney, int number) { super(maxMoney, number); } @Override void doTransfer(int from, int to, int money) { Integer[] keys = new Integer[] {from, to}; Arrays.sort(keys); lock.lock(keys); try { transfer(from, to, money); } finally { lock.unlock(keys); } } } }
테스트 결과:
( 스레드 8개 100개 계정에 총 10,000번 무작위 전송):
유형: NonLockRunner(잠금 해제)
소요 시간: 2482ms
초기 총 금액: 1000000
총 종료 금액: 998906(불가) 원자성 보장)
유형: 동기화된 Runner(동기화된 잠금 포함)
시간 소비: 20872ms
초기 총량: 1000000
총 종료량: 1000000
유형: ReentrantLockRunner (ReentrantLock 추가)
소모 시간: 21588ms
초기 총량: 1000000
총 종료량: 1000000
유형: KeyLockRunner(KeyLock 추가)
소요 시간: 2831ms
최초 총액: 1000000
총 해지 금액: 1000000