我們都知道分散式環境下要使用分散式鎖定才行。那麼分散式鎖都需要有哪些特性呢?單機redis怎麼加鎖? redis集群加鎖有哪些坑呢?別急,下面我們一步步解開Redis分散式鎖的面紗。
不論在任何情況下都只能有一個線程持有鎖。
redis叢集環境不能因為某一個節點宕機而出現取得鎖定或釋放鎖定失敗。 【相關建議:Redis影片教學】
必須有超時控制機製或撤銷操作。
自己加鎖,自己釋放。不能釋放別人加的鎖。
同一執行緒可以多次加鎖。
一般情況下都是使用setnx lua腳本實作。
直接貼程式碼
package com.fandf.test.redis; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.RandomUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * redis 单机锁 * * @author fandongfeng * @date 2023/3/29 06:52 */ @Slf4j @Service public class RedisLock { @Resource RedisTemplate<String, Object> redisTemplate; private static final String SELL_LOCK = "kill:"; /** * 模拟秒杀 * * @return 是否成功 */ public String kill() { String productId = "123"; String key = SELL_LOCK + productId; //锁value,解锁时 用来判断当前锁是否是自己加的 String value = IdUtil.fastSimpleUUID(); //加锁 十秒钟过期 防死锁 Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS); if (!flag) { return "加锁失败"; } try { String productKey = "good123"; //获取商品库存 Integer stock = (Integer) redisTemplate.opsForValue().get(productKey); if (stock == null) { //模拟录入数据, 实际应该加载时从数据库读取 redisTemplate.opsForValue().set(productKey, 100); stock = 100; } if (stock <= 0) { return "卖完了,下次早点来吧"; } //扣减库存, 模拟随机卖出数量 int randomInt = RandomUtil.randomInt(1, 10); redisTemplate.opsForValue().decrement(productKey, randomInt); // 修改db,可以丢到队列里慢慢处理 return "成功卖出" + randomInt + "个,库存剩余" + redisTemplate.opsForValue().get(productKey) + "个"; } finally { // //这种方法会存在删除别人加的锁的可能 // redisTemplate.delete(key); // if(value.equals(redisTemplate.opsForValue().get(key))){ // //因为if条件的判断和 delete不是原子性的, // //if条件判断成功后,恰好锁到期自己解锁 // //此时别的线程如果持有锁了,就会把别人的锁删除掉 // redisTemplate.delete(key); // } //使用lua脚本保证判断和删除的原子性 String luaScript = "if (redis.call('get',KEYS[1]) == ARGV[1]) then " + "return redis.call('del',KEYS[1]) " + "else " + "return 0 " + "end"; redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(key), value); } } }
進行單元測試,模擬一百個執行緒同時進行秒殺
package com.fandf.test.redis; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; /** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class SignServiceTest { @Resource RedisLock redisLock; @RepeatedTest(100) @Execution(CONCURRENT) public void redisLock() { String result = redisLock.kill(); if("加锁失败".equals(result)) { }else { System.out.println(result); } } }
只有三個執行緒搶到了鎖
成功卖出5个,库存剩余95个 成功卖出8个,库存剩余87个 成功卖出7个,库存剩余80个
總的來說有兩個:
那麼這兩個問題有沒有辦法解決呢?有,接下來我們就來講Redisson
Redisson是一個在Redis的基礎上實現的Java駐記憶體資料網格(In-Memory Data Grid)。它不僅提供了一系列的分散式的Java常用對象,還提供了許多分散式服務。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
#, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, ##AtomicLong
,
CountDownLatch,
Publish / Subscribe,
Bloom filter,
Remote service,
Spring cache,
Executor service,
Live Object service,
Scheduler service
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> </dependency>
spring: application: name: test redis: host: 127.0.0.1 port: 6379使用也很簡單,只需要注入RedissonClient即可
package com.fandf.test.redis; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author fandongfeng */ @Component @Slf4j public class RedissonTest { @Resource RedissonClient redissonClient; public void test() { RLock rLock = redissonClient.getLock("anyKey"); //rLock.lock(10, TimeUnit.SECONDS); rLock.lock(); try { // do something } catch (Exception e) { log.error("业务异常", e); } finally { rLock.unlock(); } } }
//RedissonLock.class @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }#查看lock(- 1, null, false);方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //获取当前线程id long threadId = Thread.currentThread().getId(); //加锁代码块, 返回锁的失效时间 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } // get(lockAsync(leaseTime, unit)); }我們看下它是怎麼上鎖的,也就是tryAcquire方法
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //真假加锁方法 tryAcquireAsync return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //waitTime和leaseTime都是-1,所以走这里 //过期时间internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); //跟进去源码发现默认值是30秒, private long lockWatchdogTimeout = 30 * 1000; ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture); ttlRemainingFuture = new CompletableFutureWrapper<>(s); //加锁成功,开启子线程进行续约 CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired if (ttlRemaining == null) { if (leaseTime > 0) { //如果指定了过期时间,则不续约 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //没指定过期时间,或者小于0,在这里实现锁自动续约 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }上面程式碼裡麵包含加鎖和鎖續約的邏輯,我們先來看看加鎖的程式碼
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
Redis Hexists 指令用於查看哈希表的指定欄位是否存在。
127.0.0.1:6379> hexists 123 uuid (integer) 0 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 1 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 2 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 3 127.0.0.1:6379> hexists 123 uuid (integer) 1 127.0.0.1:6379> hgetall 123 1) "uuid" 2) "3" 127.0.0.1:6379>
protected String getLockName(long threadId) { return id + ":" + threadId; } public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getServiceManager().getId(); this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } //commandExecutor.getServiceManager() 的id默认值 private final String id = UUID.randomUUID().toString();這裡就明白了,欄位名稱是uuid : threadId接下來我們看看鎖定續約的程式碼scheduleExpirationRenewal(threadId);
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); //判断该实例是否加过锁 ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { //重入次数+1 oldEntry.addThreadId(threadId); } else { //第一次加锁 entry.addThreadId(threadId); try { //锁续约核心代码 renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { //如果线程异常终止,则关闭锁续约线程 cancelExpirationRenewal(threadId); } } } }我們看看renewExpiration()方法###
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //新建一个线程执行 Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //设置锁过期时间为30秒 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock {} expiration", getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //检查锁是还否存在 if (res) { // reschedule itself 10后调用自己 renewExpiration(); } else { //关闭续约 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //注意上行代码internalLockLeaseTime / 3, //internalLockLeaseTime默认30s,那么也就是10s检查一次 ee.setTimeout(task); } //设置锁过期时间为internalLockLeaseTime 也就是30s lua脚本保证原子性 protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。
我们写个测试类看看
package com.fandf.test.redis; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * @Description: * @author: fandongfeng * @date: 2023-3-2416:45 */ @SpringBootTest class RedissonTest { @Resource private RedissonClient redisson; @Test public void watchDog() throws InterruptedException { RLock lock = redisson.getLock("123"); lock.lock(); Thread.sleep(1000000); } }
查看锁的过期时间,及是否续约
127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 26 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> ttl 123 (integer) 22 127.0.0.1:6379> ttl 123 (integer) 21 127.0.0.1:6379> ttl 123 (integer) 20 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 28 127.0.0.1:6379>
我们再改改代码,看看是否可重入和字段名称是否和我们预期一致
package com.fandf.test.redis; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class RedissonTest { @Resource private RedissonClient redisson; @Test public void watchDog() throws InterruptedException { RLock lock = redisson.getLock("123"); lock.lock(); lock.lock(); lock.lock(); //加了三次锁,此时重入次数为3 Thread.sleep(3000); //解锁一次,此时重入次数变为3 lock.unlock(); Thread.sleep(1000000); } }
127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "3" 127.0.0.1:6379> 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "2" 127.0.0.1:6379>
我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。
redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?
假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤
- 1.客户端记录当前系统时间,以毫秒为单位
- 2.依次尝试从5个redis实例中,使用相同key获取锁
当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。- 3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功- 4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间
- 5.如果获取锁失败,所有的redis实例都会进行解锁
防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题
这里有下面几个点需要注意:
更多编程相关知识,请访问:编程视频!!
以上是詳細聊聊redis中的分散式鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!