Maison >base de données >Redis >Quelle est la méthode d'implémentation du verrouillage distribué Redis ?

Quelle est la méthode d'implémentation du verrouillage distribué Redis ?

王林
王林avant
2023-05-28 17:58:271734parcourir

    1. Qu'est-ce qu'un verrou distribué ?

    Un verrou distribué est un verrou qui permet à plusieurs processus d'être visibles et mutuellement exclusifs dans un système distribué ou en mode cluster.

    Implémentation de verrous distribués basés sur Redis :

    1. Acquérir des verrous

    • Exclusion mutuelle : assurez-vous qu'un seul thread peut acquérir le verrou

    • Non bloquant : essayez d'acquérir le verrou, retournez true en cas de succès ; , false en cas d'échec ;

    Ajoutez un délai d'expiration du verrouillage pour éviter un blocage causé par un temps d'arrêt du service.

    SET lock thread1 NX EX 10SET lock thread1 NX EX 10

    2、释放锁

    • 手动释放;DEL key1

    • 超时释放,获取锁时添加一个超时锁;

    二、代码实例

    package com.guor.utils;
    
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    import java.util.concurrent.TimeUnit;
    
    public class RedisLock implements ILock{
    
        private String name;
        private StringRedisTemplate stringRedisTemplate;
    
        public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
            this.name = name;
            this.stringRedisTemplate = stringRedisTemplate;
        }
    
        private static final String KEY_PREFIX = "lock:";
    
        @Override
        public boolean tryLock(long timeout) {
            // 获取线程唯一标识
            long threadId = Thread.currentThread().getId();
            // 获取锁
            Boolean success = stringRedisTemplate.opsForValue()
                    .setIfAbsent(KEY_PREFIX + name, threadId+"", timeout, TimeUnit.SECONDS);
            // 防止拆箱的空指针异常
            return Boolean.TRUE.equals(success);
        }
    
        @Override
        public void unlock() {
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }

    上面代码存在锁误删问题:

    1. 如果线程1获取锁,但线程1发生了阻塞,导致Redis超时释放锁;

    2. 此时,线程2尝试获取锁,成功,并执行业务;

    3. 此时,线程1重新开始执行任务,并执行完毕,执行释放锁(即删除锁);

    4. 但是,线程1删除的锁,和线程2的锁是同一把锁,这就是分布式锁误删问题

    在释放锁时,释放线程自己的分布式锁,就可以解决这个问题。

    package com.guor.utils;
    
    import cn.hutool.core.lang.UUID;
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    import java.util.concurrent.TimeUnit;
    
    public class RedisLock implements ILock{
    
        private String name;
        private StringRedisTemplate stringRedisTemplate;
    
        public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
            this.name = name;
            this.stringRedisTemplate = stringRedisTemplate;
        }
    
        private static final String KEY_PREFIX = "lock:";
        private static final String UUID_PREFIX = UUID.randomUUID().toString(true) + "-";
    
        @Override
        public boolean tryLock(long timeout) {
            // 获取线程唯一标识
            String threadId = UUID_PREFIX + Thread.currentThread().getId();
            // 获取锁
            Boolean success = stringRedisTemplate.opsForValue()
                    .setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);
            // 防止拆箱的空指针异常
            return Boolean.TRUE.equals(success);
        }
    
        @Override
        public void unlock() {
            // 获取线程唯一标识
            String threadId = UUID_PREFIX + Thread.currentThread().getId();
            // 获取锁中的标识
            String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
            // 判断标示是否一致
            if(threadId.equals(id)) {
                // 释放锁
                stringRedisTemplate.delete(KEY_PREFIX + name);
            }
        }
    }

    三、基于SETNX

    2, relâchez le verrou

    manuellement ; DEL key1

    timeout release, ajoutez un timeout lors de l'acquisition du lock Lock;

    2. Exemple de code

    <!--redisson-->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.13.6</version>
    </dependency>

    Le code ci-dessus a le problème de supprimer accidentellement le verrou :

      Si le thread 1 acquiert le verrou, mais le thread 1 est bloqué, ce qui fait que Redis libère le verrou après l'expiration du délai

      À ce moment, le thread 2 essaie d'acquérir le verrou, réussit et exécute l'entreprise

      À ce moment, le thread 1 redémarre pour exécuter le ; tâche, termine l'exécution et libère le verrou (c'est-à-dire supprime le verrou) ;

      Cependant, le verrou supprimé par le thread 1 est le même verrou que le verrou du thread 2. Il s'agit du distribué problème de suppression accidentelle du verrou;

    lors de la libération du verrou, la libération du propre verrou distribué du fil peut résoudre ce problème.

    package com.guor.config;
    
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RedissonConfig {
    
        @Bean
        public RedissonClient redissonClient(){
            // 配置
            Config config = new Config();
    
            /**
             * 单点地址useSingleServer,集群地址useClusterServers
             */
            config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");
            // 创建RedissonClient对象
            return Redisson.create(config);
        }
    }

    3. Le verrou distribué implémenté sur la base de SETNX présente les problèmes suivants

    1 Il n'est pas réentrant
    Le même thread ne peut pas acquérir le même verrou plusieurs fois.

    2. Aucune nouvelle tentative

    L'acquisition du verrou n'essaie qu'une seule fois et renvoie false. Il n'y a pas de mécanisme de nouvelle tentative.

    3. Libération du délai d'attente

    Bien que la libération du verrou après expiration du délai puisse éviter une impasse, si l'exécution de l'entreprise prend beaucoup de temps, cela entraînera également la libération du verrou, ce qui présente des risques de sécurité. 🎜🎜4. Cohérence maître-esclave 🎜🎜Si Redis est déployé dans un cluster, il y a un retard dans la synchronisation maître-esclave lorsque l'hôte tombe en panne, un esclave sera sélectionné comme hôte, mais l'esclave n'a pas de serveur. identifiant de verrouillage à ce moment-là. D'autres threads peuvent acquérir le verrou, provoquant des problèmes de sécurité. 🎜🎜4. Redisson implémente des verrous distribués🎜🎜Redisson est une grille de données de mémoire Java basée sur l'implémentation de Redis. En plus de fournir des objets Java distribués couramment utilisés, il fournit également de nombreux services distribués, notamment la mise en œuvre de divers verrous distribués. 🎜🎜1. pom🎜
    package com.guor;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import javax.annotation.Resource;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    @SpringBootTest
    class RedissonTest {
    
        @Resource
        private RedissonClient redissonClient;
    
        private RLock lock;
    
        @BeforeEach
        void setUp() {
        	// 获取指定名称的锁
            lock = redissonClient.getLock("nezha");
        }
    
        @Test
        void test() throws InterruptedException {
            // 尝试获取锁
            boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
            if (!isLock) {
                log.error("获取锁失败");
                return;
            }
            try {
                log.info("哪吒最帅,哈哈哈");
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    }
    🎜2. Classe de configuration🎜
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    	// 最大等待时间
    	long time = unit.toMillis(waitTime);
    	long current = System.currentTimeMillis();
    	long threadId = Thread.currentThread().getId();
    	Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    	if (ttl == null) {
    		return true;
    	} else {
    		// 剩余等待时间 = 最大等待时间 - 获取锁失败消耗的时间
    		time -= System.currentTimeMillis() - current;
    		if (time <= 0L) {// 获取锁失败
    			this.acquireFailed(waitTime, unit, threadId);
    			return false;
    		} else {
    			// 再次尝试获取锁
    			current = System.currentTimeMillis();
    			// subscribe订阅其它释放锁的信号
    			RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
    			// 当Future在等待指定时间time内完成时,返回true
    			if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    				if (!subscribeFuture.cancel(false)) {
    					subscribeFuture.onComplete((res, e) -> {
    						if (e == null) {
    							// 取消订阅
    							this.unsubscribe(subscribeFuture, threadId);
    						}
    
    					});
    				}
    
    				this.acquireFailed(waitTime, unit, threadId);
    				return false;// 获取锁失败
    			} else {
    				try {
    					// 剩余等待时间 = 剩余等待时间 - 获取锁失败消耗的时间
    					time -= System.currentTimeMillis() - current;
    					if (time <= 0L) {
    						this.acquireFailed(waitTime, unit, threadId);
    						boolean var20 = false;
    						return var20;
    					} else {
    						boolean var16;
    						do {
    							long currentTime = System.currentTimeMillis();
    							// 重试获取锁
    							ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    							if (ttl == null) {
    								var16 = true;
    								return var16;
    							}
    							// 再次失败了,再看一下剩余时间
    							time -= System.currentTimeMillis() - currentTime;
    							if (time <= 0L) {
    								this.acquireFailed(waitTime, unit, threadId);
    								var16 = false;
    								return var16;
    							}
    							// 再重试获取锁
    							currentTime = System.currentTimeMillis();
    							if (ttl >= 0L && ttl < time) {
    								// 通过信号量的方式尝试获取信号,如果等待时间内,依然没有结果,会返回false
    								((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    							} else {
    								((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    							}
    							time -= System.currentTimeMillis() - currentTime;
    						} while(time > 0L);
    
    						this.acquireFailed(waitTime, unit, threadId);
    						var16 = false;
    						return var16;
    					}
    				} finally {
    					this.unsubscribe(subscribeFuture, threadId);
    				}
    			}
    		}
    	}
    }
    🎜3. Classe de test🎜
    private void scheduleExpirationRenewal(long threadId) {
    	RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    	// this.getEntryName():锁的名字,一个锁对应一个entry
    	// putIfAbsent:如果不存在,将锁和entry放到map里
    	RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    	if (oldEntry != null) {
    		// 同一个线程多次获取锁,相当于重入
    		oldEntry.addThreadId(threadId);
    	} else {
    		// 如果是第一次
    		entry.addThreadId(threadId);
    		// 更新有效期
    		this.renewExpiration();
    	}
    }
    🎜 5. Explorez le code source de tryLock🎜🎜1. Essayez d'acquérir le verrou🎜
    private void renewExpiration() {
    	// 从map中得到当前锁的entry
    	RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    	if (ee != null) {
    		// 开启延时任务
    		Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    			public void run(Timeout timeout) throws Exception {
    				RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
    				if (ent != null) {
    					// 取出线程id
    					Long threadId = ent.getFirstThreadId();
    					if (threadId != null) {
    						// 刷新有效期
    						RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
    						future.onComplete((res, e) -> {
    							if (e != null) {
    								RedissonLock.log.error("Can&#39;t update lock " + RedissonLock.this.getName() + " expiration", e);
    							} else {
    								if (res) {
    									// 递归调用更新有效期,永不过期
    									RedissonLock.this.renewExpiration();
    								}
    							}
    						});
    					}
    				}
    			}
    		}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);// 10S
    		ee.setTimeout(task);
    	}
    }
    🎜2. période du verrou🎜
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    	return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
    	// 判断当前线程的锁是否是当前线程
    	"if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then 
    		// 更新有效期
    		redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); 
    		return 1; 
    		end; 
    		return 0;", 
    		Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
    🎜Mettez à jour la période de validité, appelez récursivement pour mettre à jour la période de validité, elle n'expirera jamais🎜
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    	// 锁释放时间
    	this.internalLockLeaseTime = unit.toMillis(leaseTime);
    	return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, 
    		// 判断锁成功
    		"if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then
    			redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); // 如果不存在,记录锁标识,次数+1
    			redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); // 设置锁有效期
    			return nil; // 相当于Java的null
    		end; 
    		if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then 
    			redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); // 如果存在,判断锁标识是否是自己的,次数+1
    			redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); // 设置锁有效期
    			return nil; 
    		end; 
    		// 判断锁失败,pttl:指定锁剩余有效期,单位毫秒,KEYS[1]:锁的名称
    		return redis.call(&#39;pttl&#39;, KEYS[1]);", 
    			Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
    🎜Mettez à jour la période de validité🎜
    public RFuture<Void> unlockAsync(long threadId) {
    	RPromise<Void> result = new RedissonPromise();
    	RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    	future.onComplete((opStatus, e) -> {
    		// 取消更新任务
    		this.cancelExpirationRenewal(threadId);
    		if (e != null) {
    			result.tryFailure(e);
    		} else if (opStatus == null) {
    			IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
    			result.tryFailure(cause);
    		} else {
    			result.trySuccess((Object)null);
    		}
    	});
    	return result;
    }
    🎜3. Appelez le script lua🎜
    void cancelExpirationRenewal(Long threadId) {
    	// 从map中取出当前锁的定时任务entry
    	RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    	if (task != null) {
    		if (threadId != null) {
    			task.removeThreadId(threadId);
    		}
    		// 删除定时任务
    		if (threadId == null || task.hasNoThreads()) {
    			Timeout timeout = task.getTimeout();
    			if (timeout != null) {
    				timeout.cancel();
    			}
    
    			EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
    		}
    	}
    }
    🎜6. 🎜🎜1. Annulez la tâche de mise à jour🎜rrreee🎜2 Supprimez la tâche planifiée🎜rrreee.

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer