分散ロックは分散環境で使用する必要があることは誰もが知っています。では、分散ロックにはどのような特性が必要なのでしょうか?スタンドアロン Redis をロックするにはどうすればよいですか? Redis クラスターのロックの落とし穴は何ですか?心配しないで、Redis 分散ロックのベールを段階的に解き明かしていきましょう。
#分散ロックの特徴どんな状況でも たったひとつスレッドはロックを保持できます。#2. 高可用性
Redis ビデオ チュートリアル3. デッドロック対策
4. むやみにつかまないでください。
5. 再入可能
単一マシンに Redis を実装する方法
コードを直接投稿します
package com.fandf.test.redis; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.RandomUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * redis 单机锁 * * @author fandongfeng * @date 2023/3/29 06:52 */ @Slf4j @Service public class RedisLock { @Resource RedisTemplate<String, Object> redisTemplate; private static final String SELL_LOCK = "kill:"; /** * 模拟秒杀 * * @return 是否成功 */ public String kill() { String productId = "123"; String key = SELL_LOCK + productId; //锁value,解锁时 用来判断当前锁是否是自己加的 String value = IdUtil.fastSimpleUUID(); //加锁 十秒钟过期 防死锁 Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS); if (!flag) { return "加锁失败"; } try { String productKey = "good123"; //获取商品库存 Integer stock = (Integer) redisTemplate.opsForValue().get(productKey); if (stock == null) { //模拟录入数据, 实际应该加载时从数据库读取 redisTemplate.opsForValue().set(productKey, 100); stock = 100; } if (stock <= 0) { return "卖完了,下次早点来吧"; } //扣减库存, 模拟随机卖出数量 int randomInt = RandomUtil.randomInt(1, 10); redisTemplate.opsForValue().decrement(productKey, randomInt); // 修改db,可以丢到队列里慢慢处理 return "成功卖出" + randomInt + "个,库存剩余" + redisTemplate.opsForValue().get(productKey) + "个"; } finally { // //这种方法会存在删除别人加的锁的可能 // redisTemplate.delete(key); // if(value.equals(redisTemplate.opsForValue().get(key))){ // //因为if条件的判断和 delete不是原子性的, // //if条件判断成功后,恰好锁到期自己解锁 // //此时别的线程如果持有锁了,就会把别人的锁删除掉 // redisTemplate.delete(key); // } //使用lua脚本保证判断和删除的原子性 String luaScript = "if (redis.call('get',KEYS[1]) == ARGV[1]) then " + "return redis.call('del',KEYS[1]) " + "else " + "return 0 " + "end"; redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(key), value); } } }
単体テストを実施し、100 個のスレッドをシミュレートしてフラッシュ キルを同時に実行します
package com.fandf.test.redis; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; /** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class SignServiceTest { @Resource RedisLock redisLock; @RepeatedTest(100) @Execution(CONCURRENT) public void redisLock() { String result = redisLock.kill(); if("加锁失败".equals(result)) { }else { System.out.println(result); } } }
3 つのスレッドのみがロックを取得しました
成功卖出5个,库存剩余95个 成功卖出8个,库存剩余87个 成功卖出7个,库存剩余80个
redis ロックの何が問題ですか?
Redisson は分散ロックを実装します
、Set
、Multimap
、SortedSet
、Map
、List
が含まれます, キュー
, BlockingQueue
, Deque
, BlockingDeque
, セマフォ
, Lock
, AtomicLong
、CountDownLatch
、パブリッシュ/サブスクライブ
、ブルーム フィルター
、リモート サービス
、Spring キャッシュ
, Executor サービス
, Live Object サービス
, Scheduler サービス
) Redisson は、Redis を使用する最も簡単で便利な方法を提供します。 Redisson の目的は、ユーザーがビジネス ロジックの処理により集中できるように、Redis からユーザーの関心事の分離 (Separation of Concern) を促進することです。 springboot は Redisson を統合します。
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> </dependency>
spring: application: name: test redis: host: 127.0.0.1 port: 6379
package com.fandf.test.redis; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author fandongfeng */ @Component @Slf4j public class RedissonTest { @Resource RedissonClient redissonClient; public void test() { RLock rLock = redissonClient.getLock("anyKey"); //rLock.lock(10, TimeUnit.SECONDS); rLock.lock(); try { // do something } catch (Exception e) { log.error("业务异常", e); } finally { rLock.unlock(); } } }
redison を理解できない友人は、質問せずにはいられないでしょう。
えっ? ロックするときに有効期限を追加する必要はないのですか?これによりデッドロックが発生しますか?ロックを解除するには所有しているかどうかの判断が必要ではないでしょうか?あはは、心配しないでください。リディソンについては段階的に明らかにしていきます。
Redisson lock() ソース コードの追跡
//RedissonLock.class @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
View lock(- 1, null, false);Method
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //获取当前线程id long threadId = Thread.currentThread().getId(); //加锁代码块, 返回锁的失效时间 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } // get(lockAsync(leaseTime, unit)); }
どのようにロックされているか、つまり tryAcquire メソッドを見てみましょう
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //真假加锁方法 tryAcquireAsync return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //waitTime和leaseTime都是-1,所以走这里 //过期时间internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); //跟进去源码发现默认值是30秒, private long lockWatchdogTimeout = 30 * 1000; ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture); ttlRemainingFuture = new CompletableFutureWrapper<>(s); //加锁成功,开启子线程进行续约 CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired if (ttlRemaining == null) { if (leaseTime > 0) { //如果指定了过期时间,则不续约 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //没指定过期时间,或者小于0,在这里实现锁自动续约 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
上記のコードには、ロックとロックのロジックが含まれていますまずロック コードを見てみましょう
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
これは非常に明確です。Redisson はコマンドのアトミック性を確保するために Lua スクリプトを使用しています。
redis.call('hexists', KEYS[1], ARGV[2]) キー値が存在するかどうかを確認します。
Redis Hexists コマンドは、ハッシュ テーブルの指定されたフィールドが存在するかどうかを確認するために使用されます。
127.0.0.1:6379> hexists 123 uuid (integer) 0 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 1 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 2 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 3 127.0.0.1:6379> hexists 123 uuid (integer) 1 127.0.0.1:6379> hgetall 123 1) "uuid" 2) "3" 127.0.0.1:6379>
protected String getLockName(long threadId) { return id + ":" + threadId; } public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getServiceManager().getId(); this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } //commandExecutor.getServiceManager() 的id默认值 private final String id = UUID.randomUUID().toString();
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); //判断该实例是否加过锁 ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { //重入次数+1 oldEntry.addThreadId(threadId); } else { //第一次加锁 entry.addThreadId(threadId); try { //锁续约核心代码 renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { //如果线程异常终止,则关闭锁续约线程 cancelExpirationRenewal(threadId); } } } }renewExpiration() メソッドを見てみましょう
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //新建一个线程执行 Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //设置锁过期时间为30秒 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock {} expiration", getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //检查锁是还否存在 if (res) { // reschedule itself 10后调用自己 renewExpiration(); } else { //关闭续约 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //注意上行代码internalLockLeaseTime / 3, //internalLockLeaseTime默认30s,那么也就是10s检查一次 ee.setTimeout(task); } //设置锁过期时间为internalLockLeaseTime 也就是30s lua脚本保证原子性 protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。
我们写个测试类看看
package com.fandf.test.redis; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * @Description: * @author: fandongfeng * @date: 2023-3-2416:45 */ @SpringBootTest class RedissonTest { @Resource private RedissonClient redisson; @Test public void watchDog() throws InterruptedException { RLock lock = redisson.getLock("123"); lock.lock(); Thread.sleep(1000000); } }
查看锁的过期时间,及是否续约
127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 26 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> ttl 123 (integer) 22 127.0.0.1:6379> ttl 123 (integer) 21 127.0.0.1:6379> ttl 123 (integer) 20 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 28 127.0.0.1:6379>
我们再改改代码,看看是否可重入和字段名称是否和我们预期一致
package com.fandf.test.redis; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class RedissonTest { @Resource private RedissonClient redisson; @Test public void watchDog() throws InterruptedException { RLock lock = redisson.getLock("123"); lock.lock(); lock.lock(); lock.lock(); //加了三次锁,此时重入次数为3 Thread.sleep(3000); //解锁一次,此时重入次数变为3 lock.unlock(); Thread.sleep(1000000); } }
127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "3" 127.0.0.1:6379> 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "2" 127.0.0.1:6379>
我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。
redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?
假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤
- 1.客户端记录当前系统时间,以毫秒为单位
- 2.依次尝试从5个redis实例中,使用相同key获取锁
当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。- 3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功- 4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间
- 5.如果获取锁失败,所有的redis实例都会进行解锁
防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题
这里有下面几个点需要注意:
更多编程相关知识,请访问:编程视频!!
以上がRedis の分散ロックについて詳しく話しましょうの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。