Rumah > Artikel > pangkalan data > Mari bercakap secara terperinci tentang kunci yang diedarkan dalam redis
Kita semua tahu bahawa kunci yang diedarkan mesti digunakan dalam persekitaran yang diedarkan. Jadi apakah ciri yang diperlukan oleh kunci teragih? Bagaimana untuk mengunci redis yang berdiri sendiri? Apakah perangkap penguncian kelompok redis? Jangan risau, mari kita buka tudung kunci yang diedarkan Redis selangkah demi selangkah.
Tidak kira dalam apa jua keadaan Hanya satu. benang boleh memegang kunci.
Persekitaran kelompok redis tidak boleh gagal memperoleh atau melepaskan kunci kerana nod tidak berfungsi. [Cadangan berkaitan: Tutorial video Redis]
Mesti mempunyai mekanisme kawalan tamat masa atau. pembatalan beroperasi.
Kunci sendiri dan lepaskan sendiri. Kunci yang ditambahkan oleh orang lain tidak boleh dilepaskan.
Benang yang sama boleh dikunci beberapa kali.
Secara amnya, ia dilaksanakan menggunakan skrip setnx+lua.
Siarkan kod terus
package com.fandf.test.redis; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.RandomUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * redis 单机锁 * * @author fandongfeng * @date 2023/3/29 06:52 */ @Slf4j @Service public class RedisLock { @Resource RedisTemplate<String, Object> redisTemplate; private static final String SELL_LOCK = "kill:"; /** * 模拟秒杀 * * @return 是否成功 */ public String kill() { String productId = "123"; String key = SELL_LOCK + productId; //锁value,解锁时 用来判断当前锁是否是自己加的 String value = IdUtil.fastSimpleUUID(); //加锁 十秒钟过期 防死锁 Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS); if (!flag) { return "加锁失败"; } try { String productKey = "good123"; //获取商品库存 Integer stock = (Integer) redisTemplate.opsForValue().get(productKey); if (stock == null) { //模拟录入数据, 实际应该加载时从数据库读取 redisTemplate.opsForValue().set(productKey, 100); stock = 100; } if (stock <= 0) { return "卖完了,下次早点来吧"; } //扣减库存, 模拟随机卖出数量 int randomInt = RandomUtil.randomInt(1, 10); redisTemplate.opsForValue().decrement(productKey, randomInt); // 修改db,可以丢到队列里慢慢处理 return "成功卖出" + randomInt + "个,库存剩余" + redisTemplate.opsForValue().get(productKey) + "个"; } finally { // //这种方法会存在删除别人加的锁的可能 // redisTemplate.delete(key); // if(value.equals(redisTemplate.opsForValue().get(key))){ // //因为if条件的判断和 delete不是原子性的, // //if条件判断成功后,恰好锁到期自己解锁 // //此时别的线程如果持有锁了,就会把别人的锁删除掉 // redisTemplate.delete(key); // } //使用lua脚本保证判断和删除的原子性 String luaScript = "if (redis.call('get',KEYS[1]) == ARGV[1]) then " + "return redis.call('del',KEYS[1]) " + "else " + "return 0 " + "end"; redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(key), value); } } }
Jalankan ujian unit, simulasi seratus utas untuk melakukan pembunuhan kilat pada masa yang sama
package com.fandf.test.redis; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; /** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class SignServiceTest { @Resource RedisLock redisLock; @RepeatedTest(100) @Execution(CONCURRENT) public void redisLock() { String result = redisLock.kill(); if("加锁失败".equals(result)) { }else { System.out.println(result); } } }
Hanya tiga utas yang meraih kunci
成功卖出5个,库存剩余95个 成功卖出8个,库存剩余87个 成功卖出7个,库存剩余80个
Secara umum, terdapat dua:
Jadi adakah cara untuk menyelesaikan kedua-dua masalah ini? Ya, mari kita bercakap tentang Redisson
Redisson ialah grid data dalam memori Java (Grid Data Dalam Memori) yang dilaksanakan berdasarkan Redis. Ia bukan sahaja menyediakan satu siri objek Java biasa yang diedarkan, tetapi juga menyediakan banyak perkhidmatan yang diedarkan. Ini termasuk (BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson menyediakan cara yang paling mudah dan paling mudah untuk menggunakan Redis . Tujuan Redisson adalah untuk mempromosikan pengasingan kebimbangan pengguna (Separation of Concern) daripada Redis, supaya pengguna boleh memberi lebih tumpuan kepada memproses logik perniagaan.
Integrasi sangat mudah, hanya dua langkah
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> </dependency>
spring: application: name: test redis: host: 127.0.0.1 port: 6379
Ia juga sangat mudah digunakan, hanya menyuntik RedissonClient
package com.fandf.test.redis; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author fandongfeng */ @Component @Slf4j public class RedissonTest { @Resource RedissonClient redissonClient; public void test() { RLock rLock = redissonClient.getLock("anyKey"); //rLock.lock(10, TimeUnit.SECONDS); rLock.lock(); try { // do something } catch (Exception e) { log.error("业务异常", e); } finally { rLock.unlock(); } } }
Rakan yang mungkin tidak tahu redisson tidak boleh tidak bertanya soalan.
Apakah anda tidak perlu menambah masa tamat tempoh semasa mengunci? Adakah ini akan menyebabkan kebuntuan? Tidakkah memerlukan pertimbangan untuk membuka kunci sama ada anda memegangnya sendiri?
Haha, jangan risau, kami akan mendedahkan penyusunan semula langkah demi langkah.
Mari ikut kaedah lock() langkah demi langkah dan lihat kod sumber (versi redisson tempatan ialah 3.20.0)
//RedissonLock.class @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
Kunci Lihat (-1, batal, palsu); Kaedah
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //获取当前线程id long threadId = Thread.currentThread().getId(); //加锁代码块, 返回锁的失效时间 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } // get(lockAsync(leaseTime, unit)); }
Mari lihat bagaimana ia dikunci, iaitu kaedah tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //真假加锁方法 tryAcquireAsync return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //waitTime和leaseTime都是-1,所以走这里 //过期时间internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); //跟进去源码发现默认值是30秒, private long lockWatchdogTimeout = 30 * 1000; ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture); ttlRemainingFuture = new CompletableFutureWrapper<>(s); //加锁成功,开启子线程进行续约 CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired if (ttlRemaining == null) { if (leaseTime > 0) { //如果指定了过期时间,则不续约 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //没指定过期时间,或者小于0,在这里实现锁自动续约 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
The kod di atas mengandungi tambahan Logik penguncian dan pembaharuan kunci, mari kita lihat kod penguncian
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) " + "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
Ia akan menjadi sangat jelas di sini Redisson menggunakan skrip Lua untuk memastikan atomicity arahan.
redis.call('hexists', KEYS[1], ARGV[2]) Semak sama ada nilai kunci wujud.
Arahan Redis Hexists digunakan untuk menyemak sama ada medan yang ditentukan bagi jadual cincang wujud.
Mengembalikan 1 jika jadual cincang mengandungi medan yang diberikan. Jika jadual cincang tidak mengandungi medan yang diberikan, atau kunci tidak wujud, 0 dikembalikan.
127.0.0.1:6379> hexists 123 uuid (integer) 0 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 1 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 2 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 3 127.0.0.1:6379> hexists 123 uuid (integer) 1 127.0.0.1:6379> hgetall 123 1) "uuid" 2) "3" 127.0.0.1:6379>
Apabila kunci tidak wujud, atau sudah mengandungi medan tertentu (iaitu, ia telah dikunci, ini untuk mencapai kemasukan semula), tambah terus 1 pada nilai medan
Medan ini Nilai, iaitu ARGV[2], diperoleh dengan kaedah getLockName(threadId) Mari kita lihat nilai medan ini
protected String getLockName(long threadId) { return id + ":" + threadId; } public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getServiceManager().getId(); this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; } //commandExecutor.getServiceManager() 的id默认值 private final String id = UUID.randomUUID().toString();
dan kami akan faham di sini ialah uuid + : + threadId
Seterusnya mari kita lihat jadual kod pembaharuan kunciExpirationRenewal(threadId);
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); //判断该实例是否加过锁 ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { //重入次数+1 oldEntry.addThreadId(threadId); } else { //第一次加锁 entry.addThreadId(threadId); try { //锁续约核心代码 renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { //如果线程异常终止,则关闭锁续约线程 cancelExpirationRenewal(threadId); } } } }
Mari lihat kaedah renewExpiration()
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //新建一个线程执行 Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //设置锁过期时间为30秒 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock {} expiration", getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //检查锁是还否存在 if (res) { // reschedule itself 10后调用自己 renewExpiration(); } else { //关闭续约 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //注意上行代码internalLockLeaseTime / 3, //internalLockLeaseTime默认30s,那么也就是10s检查一次 ee.setTimeout(task); } //设置锁过期时间为internalLockLeaseTime 也就是30s lua脚本保证原子性 protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。
我们写个测试类看看
package com.fandf.test.redis; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * @Description: * @author: fandongfeng * @date: 2023-3-2416:45 */ @SpringBootTest class RedissonTest { @Resource private RedissonClient redisson; @Test public void watchDog() throws InterruptedException { RLock lock = redisson.getLock("123"); lock.lock(); Thread.sleep(1000000); } }
查看锁的过期时间,及是否续约
127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 26 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> ttl 123 (integer) 22 127.0.0.1:6379> ttl 123 (integer) 21 127.0.0.1:6379> ttl 123 (integer) 20 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 28 127.0.0.1:6379>
我们再改改代码,看看是否可重入和字段名称是否和我们预期一致
package com.fandf.test.redis; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class RedissonTest { @Resource private RedissonClient redisson; @Test public void watchDog() throws InterruptedException { RLock lock = redisson.getLock("123"); lock.lock(); lock.lock(); lock.lock(); //加了三次锁,此时重入次数为3 Thread.sleep(3000); //解锁一次,此时重入次数变为3 lock.unlock(); Thread.sleep(1000000); } }
127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "3" 127.0.0.1:6379> 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "2" 127.0.0.1:6379>
我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。
redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?
假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤
- 1.客户端记录当前系统时间,以毫秒为单位
- 2.依次尝试从5个redis实例中,使用相同key获取锁
当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。- 3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功- 4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间
- 5.如果获取锁失败,所有的redis实例都会进行解锁
防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题
这里有下面几个点需要注意:
更多编程相关知识,请访问:编程视频!!
Atas ialah kandungan terperinci Mari bercakap secara terperinci tentang kunci yang diedarkan dalam redis. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!