Heim >Datenbank >Redis >Was ist die Methode zur Implementierung der verteilten Redis-Sperre?

Was ist die Methode zur Implementierung der verteilten Redis-Sperre?

王林
王林nach vorne
2023-05-28 17:58:271745Durchsuche

    1. Was ist eine verteilte Sperre?

    Eine verteilte Sperre ist eine Sperre, die es ermöglicht, dass mehrere Prozesse in einem verteilten System oder Clustermodus sichtbar sind und sich gegenseitig ausschließen.

    Verteilte Sperren basierend auf Redis implementieren:

    1. Sperren erwerben

    • Gegenseitiger Ausschluss: Stellen Sie sicher, dass nur ein Thread die Sperre erwerben kann;

    • Nicht blockierend: Versuchen Sie, die Sperre zu erwerben, und geben Sie bei Erfolg „true“ zurück , false, wenn fehlgeschlagen;

    Ablaufzeit der Sperre hinzufügen, um einen durch Dienstausfallzeiten verursachten Deadlock zu vermeiden.

    SET lock thread1 NX EX 10SET lock thread1 NX EX 10

    2、释放锁

    • 手动释放;DEL key1

    • 超时释放,获取锁时添加一个超时锁;

    二、代码实例

    package com.guor.utils;
    
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    import java.util.concurrent.TimeUnit;
    
    public class RedisLock implements ILock{
    
        private String name;
        private StringRedisTemplate stringRedisTemplate;
    
        public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
            this.name = name;
            this.stringRedisTemplate = stringRedisTemplate;
        }
    
        private static final String KEY_PREFIX = "lock:";
    
        @Override
        public boolean tryLock(long timeout) {
            // 获取线程唯一标识
            long threadId = Thread.currentThread().getId();
            // 获取锁
            Boolean success = stringRedisTemplate.opsForValue()
                    .setIfAbsent(KEY_PREFIX + name, threadId+"", timeout, TimeUnit.SECONDS);
            // 防止拆箱的空指针异常
            return Boolean.TRUE.equals(success);
        }
    
        @Override
        public void unlock() {
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }

    上面代码存在锁误删问题:

    1. 如果线程1获取锁,但线程1发生了阻塞,导致Redis超时释放锁;

    2. 此时,线程2尝试获取锁,成功,并执行业务;

    3. 此时,线程1重新开始执行任务,并执行完毕,执行释放锁(即删除锁);

    4. 但是,线程1删除的锁,和线程2的锁是同一把锁,这就是分布式锁误删问题

    在释放锁时,释放线程自己的分布式锁,就可以解决这个问题。

    package com.guor.utils;
    
    import cn.hutool.core.lang.UUID;
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    import java.util.concurrent.TimeUnit;
    
    public class RedisLock implements ILock{
    
        private String name;
        private StringRedisTemplate stringRedisTemplate;
    
        public RedisLock(String name, StringRedisTemplate stringRedisTemplate) {
            this.name = name;
            this.stringRedisTemplate = stringRedisTemplate;
        }
    
        private static final String KEY_PREFIX = "lock:";
        private static final String UUID_PREFIX = UUID.randomUUID().toString(true) + "-";
    
        @Override
        public boolean tryLock(long timeout) {
            // 获取线程唯一标识
            String threadId = UUID_PREFIX + Thread.currentThread().getId();
            // 获取锁
            Boolean success = stringRedisTemplate.opsForValue()
                    .setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);
            // 防止拆箱的空指针异常
            return Boolean.TRUE.equals(success);
        }
    
        @Override
        public void unlock() {
            // 获取线程唯一标识
            String threadId = UUID_PREFIX + Thread.currentThread().getId();
            // 获取锁中的标识
            String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
            // 判断标示是否一致
            if(threadId.equals(id)) {
                // 释放锁
                stringRedisTemplate.delete(KEY_PREFIX + name);
            }
        }
    }

    三、基于SETNX

    2, Sperre manuell aufheben; DEL key1

    timeout release, beim Erfassen eine Zeitüberschreitung hinzufügen lock Lock;

    2. Codebeispiel

    <!--redisson-->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.13.6</version>
    </dependency>

    Der obige Code hat das Problem, dass die Sperre versehentlich gelöscht wird:

      Wenn Thread 1 die Sperre erhält, aber Thread 1 ist blockiert, was dazu führt, dass Redis die Sperre nach einer Zeitüberschreitung aufhebt;

      Zu diesem Zeitpunkt versucht Thread 2, die Sperre zu erlangen, ist erfolgreich und führt das Geschäft aus Aufgabe, schließt die Ausführung ab und gibt die Sperre frei (d. h. löscht die Sperre).

      Die von Thread 1 gelöschte Sperre ist jedoch dieselbe Sperre wie die Sperre von Thread 2. Dies ist der verteilte Problem mit versehentlichem Löschen der Sperre;

    Beim Aufheben der Sperre kann das Aufheben der eigenen verteilten Sperre des Threads dieses Problem lösen.

    package com.guor.config;
    
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RedissonConfig {
    
        @Bean
        public RedissonClient redissonClient(){
            // 配置
            Config config = new Config();
    
            /**
             * 单点地址useSingleServer,集群地址useClusterServers
             */
            config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");
            // 创建RedissonClient对象
            return Redisson.create(config);
        }
    }

    3. Die auf SETNX implementierte verteilte Sperre weist die folgenden Probleme auf:

    1 Es ist nicht wiedereintrittsfähig: Derselbe Thread kann nicht mehrmals dieselbe Sperre erwerben.

    2. Kein Wiederholungsversuch.
    Der Erwerb der Sperre erfolgt nur einmal und gibt „False“ zurück. Es gibt keinen Wiederholungsmechanismus.

    3. Timeout-Freigabe

    Obwohl Timeout-Freigabe von Sperren Deadlocks vermeiden kann, führt dies bei längerer Geschäftsausführung auch zur Sperrenfreigabe, was ein Sicherheitsrisiko darstellt.
    4. Master-Slave-Konsistenz

    Wenn Redis in einem Cluster bereitgestellt wird, kommt es zu einer Verzögerung bei der Master-Slave-Synchronisierung. Wenn der Host ausfällt, wird ein Slave als Host ausgewählt, der Slave verfügt jedoch nicht über eine Die Sperrkennung kann zu diesem Zeitpunkt von anderen Threads übernommen werden, was zu Sicherheitsproblemen führen kann.

    4. Redisson implementiert verteilte Sperren

    Redisson ist ein Java-Speicherdatengitter, das auf der Redis-Implementierung basiert. Neben der Bereitstellung häufig verwendeter verteilter Java-Objekte werden auch viele verteilte Dienste bereitgestellt, einschließlich der Implementierung verschiedener verteilter Sperren.

    1. pom🎜
    package com.guor;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import javax.annotation.Resource;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    @SpringBootTest
    class RedissonTest {
    
        @Resource
        private RedissonClient redissonClient;
    
        private RLock lock;
    
        @BeforeEach
        void setUp() {
        	// 获取指定名称的锁
            lock = redissonClient.getLock("nezha");
        }
    
        @Test
        void test() throws InterruptedException {
            // 尝试获取锁
            boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
            if (!isLock) {
                log.error("获取锁失败");
                return;
            }
            try {
                log.info("哪吒最帅,哈哈哈");
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    }
    🎜3. Testen Sie den tryLock-Quellcode🎜🎜Versuchen Sie, die Sperre zu erhalten🎜
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    	// 最大等待时间
    	long time = unit.toMillis(waitTime);
    	long current = System.currentTimeMillis();
    	long threadId = Thread.currentThread().getId();
    	Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    	if (ttl == null) {
    		return true;
    	} else {
    		// 剩余等待时间 = 最大等待时间 - 获取锁失败消耗的时间
    		time -= System.currentTimeMillis() - current;
    		if (time <= 0L) {// 获取锁失败
    			this.acquireFailed(waitTime, unit, threadId);
    			return false;
    		} else {
    			// 再次尝试获取锁
    			current = System.currentTimeMillis();
    			// subscribe订阅其它释放锁的信号
    			RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
    			// 当Future在等待指定时间time内完成时,返回true
    			if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    				if (!subscribeFuture.cancel(false)) {
    					subscribeFuture.onComplete((res, e) -> {
    						if (e == null) {
    							// 取消订阅
    							this.unsubscribe(subscribeFuture, threadId);
    						}
    
    					});
    				}
    
    				this.acquireFailed(waitTime, unit, threadId);
    				return false;// 获取锁失败
    			} else {
    				try {
    					// 剩余等待时间 = 剩余等待时间 - 获取锁失败消耗的时间
    					time -= System.currentTimeMillis() - current;
    					if (time <= 0L) {
    						this.acquireFailed(waitTime, unit, threadId);
    						boolean var20 = false;
    						return var20;
    					} else {
    						boolean var16;
    						do {
    							long currentTime = System.currentTimeMillis();
    							// 重试获取锁
    							ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    							if (ttl == null) {
    								var16 = true;
    								return var16;
    							}
    							// 再次失败了,再看一下剩余时间
    							time -= System.currentTimeMillis() - currentTime;
    							if (time <= 0L) {
    								this.acquireFailed(waitTime, unit, threadId);
    								var16 = false;
    								return var16;
    							}
    							// 再重试获取锁
    							currentTime = System.currentTimeMillis();
    							if (ttl >= 0L && ttl < time) {
    								// 通过信号量的方式尝试获取信号,如果等待时间内,依然没有结果,会返回false
    								((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    							} else {
    								((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    							}
    							time -= System.currentTimeMillis() - currentTime;
    						} while(time > 0L);
    
    						this.acquireFailed(waitTime, unit, threadId);
    						var16 = false;
    						return var16;
    					}
    				} finally {
    					this.unsubscribe(subscribeFuture, threadId);
    				}
    			}
    		}
    	}
    }
    🎜2 Zeitraum der Sperre🎜
    private void scheduleExpirationRenewal(long threadId) {
    	RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    	// this.getEntryName():锁的名字,一个锁对应一个entry
    	// putIfAbsent:如果不存在,将锁和entry放到map里
    	RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    	if (oldEntry != null) {
    		// 同一个线程多次获取锁,相当于重入
    		oldEntry.addThreadId(threadId);
    	} else {
    		// 如果是第一次
    		entry.addThreadId(threadId);
    		// 更新有效期
    		this.renewExpiration();
    	}
    }
    🎜Aktualisieren Sie den Gültigkeitszeitraum, rufen Sie ihn rekursiv auf, er läuft nie ab🎜
    private void renewExpiration() {
    	// 从map中得到当前锁的entry
    	RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    	if (ee != null) {
    		// 开启延时任务
    		Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    			public void run(Timeout timeout) throws Exception {
    				RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
    				if (ent != null) {
    					// 取出线程id
    					Long threadId = ent.getFirstThreadId();
    					if (threadId != null) {
    						// 刷新有效期
    						RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
    						future.onComplete((res, e) -> {
    							if (e != null) {
    								RedissonLock.log.error("Can&#39;t update lock " + RedissonLock.this.getName() + " expiration", e);
    							} else {
    								if (res) {
    									// 递归调用更新有效期,永不过期
    									RedissonLock.this.renewExpiration();
    								}
    							}
    						});
    					}
    				}
    			}
    		}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);// 10S
    		ee.setTimeout(task);
    	}
    }
    🎜3. Rufen Sie das Lua-Skript auf🎜
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    	return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
    	// 判断当前线程的锁是否是当前线程
    	"if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then 
    		// 更新有效期
    		redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); 
    		return 1; 
    		end; 
    		return 0;", 
    		Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
    🎜6 🎜🎜1. Brechen Sie die Aktualisierungsaufgabe ab🎜
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    	// 锁释放时间
    	this.internalLockLeaseTime = unit.toMillis(leaseTime);
    	return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, 
    		// 判断锁成功
    		"if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then
    			redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); // 如果不存在,记录锁标识,次数+1
    			redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); // 设置锁有效期
    			return nil; // 相当于Java的null
    		end; 
    		if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then 
    			redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); // 如果存在,判断锁标识是否是自己的,次数+1
    			redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); // 设置锁有效期
    			return nil; 
    		end; 
    		// 判断锁失败,pttl:指定锁剩余有效期,单位毫秒,KEYS[1]:锁的名称
    		return redis.call(&#39;pttl&#39;, KEYS[1]);", 
    			Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
    🎜2. Löschen Sie die geplante Aufgabe🎜
    public RFuture<Void> unlockAsync(long threadId) {
    	RPromise<Void> result = new RedissonPromise();
    	RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    	future.onComplete((opStatus, e) -> {
    		// 取消更新任务
    		this.cancelExpirationRenewal(threadId);
    		if (e != null) {
    			result.tryFailure(e);
    		} else if (opStatus == null) {
    			IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
    			result.tryFailure(cause);
    		} else {
    			result.trySuccess((Object)null);
    		}
    	});
    	return result;
    }

    Das obige ist der detaillierte Inhalt vonWas ist die Methode zur Implementierung der verteilten Redis-Sperre?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen