Plusieurs verrous en Java : synchronisé, ReentrantLock, ReentrantReadWriteLock peuvent essentiellement répondre aux besoins de programmation, mais leur granularité est trop grande. Un seul thread peut entrer dans le bloc synchronisé en même temps, ce qui ne convient pas à certains scénarios à forte concurrence. .
Ce qui suit fournit plusieurs verrous plus fins :
1 Verrous segmentés
S'inspirer de l'idée de segmentation de ConcurrentHashMap, générez d'abord un certain nombre de verrous, et utilisez-les spécifiquement. Ensuite, renvoyez le verrou correspondant en fonction de la clé. C'est la plus simple et la plus performante parmi plusieurs implémentations, et c'est aussi la stratégie de verrouillage qui a finalement été adoptée. Le code est le suivant :
/** * 分段锁,系统提供一定数量的原始锁,根据传入对象的哈希值获取对应的锁并加锁 * 注意:要锁的对象的哈希值如果发生改变,有可能导致锁无法成功释放!!! */ public class SegmentLock<T> { private Integer segments = 16;//默认分段数量 private final HashMap<Integer, ReentrantLock> lockMap = new HashMap<>(); public SegmentLock() { init(null, false); } public SegmentLock(Integer counts, boolean fair) { init(counts, fair); } private void init(Integer counts, boolean fair) { if (counts != null) { segments = counts; } for (int i = 0; i < segments; i++) { lockMap.put(i, new ReentrantLock(fair)); } } public void lock(T key) { ReentrantLock lock = lockMap.get(key.hashCode() % segments); lock.lock(); } public void unlock(T key) { ReentrantLock lock = lockMap.get(key.hashCode() % segments); lock.unlock(); } }
2. le verrou segmenté ci-dessus La deuxième stratégie de verrouillage développée consiste à obtenir un véritable verrouillage à granularité fine. Chaque objet avec une valeur de hachage différente peut obtenir son propre verrou indépendant. Lors des tests, lorsque le code verrouillé s'exécute très rapidement, l'efficacité est environ 30 % plus lente que le verrouillage segmenté. S'il y a des opérations à long terme, il semble que les performances devraient être meilleures. Le code est le suivant :
3. Verrou de référence faiblepublic class HashLock<T> { private boolean isFair = false; private final SegmentLock<T> segmentLock = new SegmentLock<>();//分段锁 private final ConcurrentHashMap<T, LockInfo> lockMap = new ConcurrentHashMap<>(); public HashLock() { } public HashLock(boolean fair) { isFair = fair; } public void lock(T key) { LockInfo lockInfo; segmentLock.lock(key); try { lockInfo = lockMap.get(key); if (lockInfo == null) { lockInfo = new LockInfo(isFair); lockMap.put(key, lockInfo); } else { lockInfo.count.incrementAndGet(); } } finally { segmentLock.unlock(key); } lockInfo.lock.lock(); } public void unlock(T key) { LockInfo lockInfo = lockMap.get(key); if (lockInfo.count.get() == 1) { segmentLock.lock(key); try { if (lockInfo.count.get() == 1) { lockMap.remove(key); } } finally { segmentLock.unlock(key); } } lockInfo.count.decrementAndGet(); lockInfo.unlock(); } private static class LockInfo { public ReentrantLock lock; public AtomicInteger count = new AtomicInteger(1); private LockInfo(boolean fair) { this.lock = new ReentrantLock(fair); } public void lock() { this.lock.lock(); } public void unlock() { this.lock.unlock(); } } }
Hash Lock car le verrou segmenté est introduit pour assurer la synchronisation de la création et de la destruction du verrou, cela semble toujours un peu imparfait. , j'ai donc écrit les trois premiers verrous pour rechercher de meilleures performances et des verrous plus fins. L'idée de ce verrou est d'utiliser la référence faible de Java pour créer le verrou et de confier la destruction du verrou au garbage collection de la JVM pour éviter une consommation supplémentaire.
Il est dommage que, comme ConcurrentHashMap est utilisé comme conteneur de verrouillage, il ne puisse pas vraiment se débarrasser du verrou de segmentation. Les performances de ce verrou sont environ 10 % plus rapides que celles de HashLock. Code de verrouillage :
/** * 弱引用锁,为每个独立的哈希值提供独立的锁功能 */ public class WeakHashLock<T> { private ConcurrentHashMap<T, WeakLockRef<T, ReentrantLock>> lockMap = new ConcurrentHashMap<>(); private ReferenceQueue<ReentrantLock> queue = new ReferenceQueue<>(); public ReentrantLock get(T key) { if (lockMap.size() > 1000) { clearEmptyRef(); } WeakReference<ReentrantLock> lockRef = lockMap.get(key); ReentrantLock lock = (lockRef == null ? null : lockRef.get()); while (lock == null) { lockMap.putIfAbsent(key, new WeakLockRef<>(new ReentrantLock(), queue, key)); lockRef = lockMap.get(key); lock = (lockRef == null ? null : lockRef.get()); if (lock != null) { return lock; } clearEmptyRef(); } return lock; } @SuppressWarnings("unchecked") private void clearEmptyRef() { Reference<? extends ReentrantLock> ref; while ((ref = queue.poll()) != null) { WeakLockRef<T, ? extends ReentrantLock> weakLockRef = (WeakLockRef<T, ? extends ReentrantLock>) ref; lockMap.remove(weakLockRef.key); } } private static final class WeakLockRef<T, K> extends WeakReference<K> { final T key; private WeakLockRef(K referent, ReferenceQueue<? super K> q, T key) { super(referent, q); this.key = key; } } }4. Verrouillage mutex basé sur la clé (clé primaire)
KeyLock est la clé (clé primaire) des données qui doit être traité ) pour verrouiller, tant que différentes opérations de clé sont effectuées, elles peuvent être traitées en parallèle, améliorant considérablement le parallélisme des threads
KeyLock a les fonctionnalités suivantes :
1. Parallélisme à grain fin et élevé
2. Réentrant 3. Verrouillage équitable
4. La surcharge de verrouillage est plus grande que ReentrantLock, ce qui convient pour un traitement à long terme, Scénarios avec une large gamme de clés
public class KeyLock<K> { // 保存所有锁定的KEY及其信号量 private final ConcurrentMap<K, Semaphore> map = new ConcurrentHashMap<K, Semaphore>(); // 保存每个线程锁定的KEY及其锁定计数 private final ThreadLocal<Map<K, LockInfo>> local = new ThreadLocal<Map<K, LockInfo>>() { @Override protected Map<K, LockInfo> initialValue() { return new HashMap<K, LockInfo>(); } }; /** * 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(K)} * 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashCode()}和 * {@link #equals(Object)}方法 * * @param key */ public void lock(K key) { if (key == null) return; LockInfo info = local.get().get(key); if (info == null) { Semaphore current = new Semaphore(1); current.acquireUninterruptibly(); Semaphore previous = map.put(key, current); if (previous != null) previous.acquireUninterruptibly(); local.get().put(key, new LockInfo(current)); } else { info.lockCount++; } } /** * 释放key,唤醒其他等待此key的线程 * @param key */ public void unlock(K key) { if (key == null) return; LockInfo info = local.get().get(key); if (info != null && --info.lockCount == 0) { info.current.release(); map.remove(key, info.current); local.get().remove(key); } } /** * 锁定多个key * 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生 * @param keys */ public void lock(K[] keys) { if (keys == null) return; for (K key : keys) { lock(key); } } /** * 释放多个key * @param keys */ public void unlock(K[] keys) { if (keys == null) return; for (K key : keys) { unlock(key); } } private static class LockInfo { private final Semaphore current; private int lockCount; private LockInfo(Semaphore current) { this.current = current; this.lockCount = 1; } } }
Exemple d'utilisation de KeyLock :
Le code de test est le suivant :private int[] accounts; private KeyLock<Integer> lock = new KeyLock<Integer>(); public boolean transfer(int from, int to, int money) { Integer[] keys = new Integer[] {from, to}; Arrays.sort(keys); //对多个key进行排序,保证锁定顺序防止死锁 lock.lock(keys); try { //处理不同的from和to的线程都可进入此同步块 if (accounts[from] < money) return false; accounts[from] -= money; accounts[to] += money; return true; } finally { lock.unlock(keys); } }Résultats des tests :
//场景:多线程并发转账 public class Test { private final int[] account; // 账户数组,其索引为账户ID,内容为金额 public Test(int count, int money) { account = new int[count]; Arrays.fill(account, money); } boolean transfer(int from, int to, int money) { if (account[from] < money) return false; account[from] -= money; try { Thread.sleep(2); } catch (Exception e) { } account[to] += money; return true; } int getAmount() { int result = 0; for (int m : account) result += m; return result; } public static void main(String[] args) throws Exception { int count = 100; //账户个数 int money = 10000; //账户初始金额 int threadNum = 8; //转账线程数 int number = 10000; //转账次数 int maxMoney = 1000; //随机转账最大金额 Test test = new Test(count, money); //不加锁 // Runner runner = test.new NonLockRunner(maxMoney, number); //加synchronized锁 // Runner runner = test.new SynchronizedRunner(maxMoney, number); //加ReentrantLock锁 // Runner runner = test.new ReentrantLockRunner(maxMoney, number); //加KeyLock锁 Runner runner = test.new KeyLockRunner(maxMoney, number); Thread[] threads = new Thread[threadNum]; for (int i = 0; i < threadNum; i++) threads[i] = new Thread(runner, "thread-" + i); long begin = System.currentTimeMillis(); for (Thread t : threads) t.start(); for (Thread t : threads) t.join(); long time = System.currentTimeMillis() - begin; System.out.println("类型:" + runner.getClass().getSimpleName()); System.out.printf("耗时:%dms\n", time); System.out.printf("初始总金额:%d\n", count * money); System.out.printf("终止总金额:%d\n", test.getAmount()); } // 转账任务 abstract class Runner implements Runnable { final int maxMoney; final int number; private final Random random = new Random(); private final AtomicInteger count = new AtomicInteger(); Runner(int maxMoney, int number) { this.maxMoney = maxMoney; this.number = number; } @Override public void run() { while(count.getAndIncrement() < number) { int from = random.nextInt(account.length); int to; while ((to = random.nextInt(account.length)) == from) ; int money = random.nextInt(maxMoney); doTransfer(from, to, money); } } abstract void doTransfer(int from, int to, int money); } // 不加锁的转账 class NonLockRunner extends Runner { NonLockRunner(int maxMoney, int number) { super(maxMoney, number); } @Override void doTransfer(int from, int to, int money) { transfer(from, to, money); } } // synchronized的转账 class SynchronizedRunner extends Runner { SynchronizedRunner(int maxMoney, int number) { super(maxMoney, number); } @Override synchronized void doTransfer(int from, int to, int money) { transfer(from, to, money); } } // ReentrantLock的转账 class ReentrantLockRunner extends Runner { private final ReentrantLock lock = new ReentrantLock(); ReentrantLockRunner(int maxMoney, int number) { super(maxMoney, number); } @Override void doTransfer(int from, int to, int money) { lock.lock(); try { transfer(from, to, money); } finally { lock.unlock(); } } } // KeyLock的转账 class KeyLockRunner extends Runner { private final KeyLock<Integer> lock = new KeyLock<Integer>(); KeyLockRunner(int maxMoney, int number) { super(maxMoney, number); } @Override void doTransfer(int from, int to, int money) { Integer[] keys = new Integer[] {from, to}; Arrays.sort(keys); lock.lock(keys); try { transfer(from, to, money); } finally { lock.unlock(keys); } } } }
(Un total de 10 000 transferts aléatoires vers 100 comptes par 8 threads) :
Type : NonLockRunner (débloqué)
Consommation de temps : 2482 ms Montant total initial : 1 000 000
Montant total de terminaison : 998906 (l'atomicité ne peut pas être garantie)
Type : SynchronizedRunner (plus verrouillage synchronisé)
Consommation de temps : 20872 ms
Montant total initial : 1 000 000
Montant total de résiliation : 1000000
Type : ReentrantLockRunner (plus le verrou ReentrantLock)
Temps passé : 21588 ms
Montant total initial : 1000000
Montant total de résiliation : 1000000
Type : KeyLockRunner (plus KeyLock)
Temps pris : 2831 ms
Montant total initial : 1000000
Montant total de résiliation : 1000000