>데이터 베이스 >Redis >Redis 분산 잠금을 갱신하는 방법

Redis 분산 잠금을 갱신하는 방법

WBOY
WBOY앞으로
2023-05-27 22:26:062743검색

Redis 분산 잠금 갱신 방법

Redis 분산 잠금의 올바른 자세

Fei Chao에 따르면 많은 학생들이 분산 잠금을 사용할 때 Baidu에서 직접 검색하여 Redis 분산 잠금 도구 클래스를 찾아 직접 사용하는 것이 핵심입니다. 이 도구 클래스는 또한 많은 System.out.println(); 및 기타 명령문으로 채워져 있습니다. 실제로 Redis 분산 잠금에 대한 더 올바른 접근 방식은 클라이언트 도구 redisson을 사용하는 것입니다. -섹스 데이트 웹사이트 github.

답변

먼저 Redis의 분산 잠금 장치를 올바르게 사용하고 해당 공식 문서를 읽었다면 이 질문은 매우 쉽습니다. 영어가 훌륭하다면 영어 문서를 읽어보시면 이해가 더 쉬울 것입니다

Redis 분산 잠금을 갱신하는 방법기본적으로 잠금 감시 시간 제한은 30초이며 Config.lockWatchdogTimeout 설정을 통해 변경할 수 있습니다.

하지만 중국어 문서를 읽으면

Watchdog이 잠금을 확인합니다. 기본 타임아웃 시간은 30초입니다

페이차오라는 문장은 언어학적으로 두 가지 의미를 갖는 문장입니다.

1. Watchdog은 타임아웃 시간을 확인하기 위해 기본적으로 30초를 설정합니다. a lock

2. Watchdog은 lock timeout을 확인합니다. 기본 lock 시간은 30초입니다

이것을 보고 우리 초등학교 체육 선생님과 중국어 선생님이 같은 사람이지만 모두가 비난하지 않기를 바랍니다. .중국어가 잘 안되면 소스를 만들어도 됩니다!

소스코드 분석

공식문서에 나와 있는 예제를 바탕으로 가장 간단한 데모를 작성했습니다. 예제는 Ctr+C와 Ctr+를 기반으로 합니다. 위 스크린샷의 V wave 작업은 다음과 같습니다

public class DemoMain {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        RLock lock = redisson.getLock("anyLock");
        lock.lock();
        //lock.unlock();
    }
}

create

여기서 우리는 두 매개 변수 InternalLockLeaseTime과 lockWatchdogTimeout이 동일하다는 것을 알 수 있습니다.

lockWatchdogTimeout의 기본값은 다음과 같습니다Redis 분산 잠금을 갱신하는 방법

public class Config {	
	private long lockWatchdogTimeout = 30 * 1000;		
	public long getLockWatchdogTimeout() {
		return lockWatchdogTimeout;
	}	
	//省略无关代码
}

할 수 있습니다. 또한 InternalLockLeaseTime이라는 단어를 보면 이 추가된 분산 잠금의 기본 시간 초과가 30초라는 것을 알 수 있습니다. 하지만 또 다른 질문이 있습니다. 즉, 이 감시 장치가 유효 기간을 얼마나 자주 연장하는지 살펴보겠습니다.

사진의 프레임 영역을 보면 잠금을 성공적으로 획득하면 예약된 작업, 즉 watchdog이 시작된다는 것을 알 수 있습니다. 예약된 작업은 renewExpirationAsync(threadId) 갱신을 주기적으로 확인합니다. netty-common 패키지의 HashedWheelTimer. Feichao 공식 계정은 주요 검색 엔진과 긴밀한 협력 관계를 구축했습니다. 관련 API 매개변수의 의미를 보려면 모든 검색 엔진에서 이 클래스를 검색하기만 하면 됩니다. , 예정된 일정의 각 호출 간의 시간 차이는 InternalLockLeaseTime / 3 이라는 것을 알고 있습니다. 단 10초입니다.

진실은 밝혀졌습니다

소스 코드 분석을 통해 기본적으로 잠금 시간은 30초임을 알 수 있습니다. 잠긴 업무가 완료되지 않은 경우 30-10 = 20초가 지나면 갱신이 수행되고 잠금이 30초로 재설정됩니다. 이때 일부 학생들은 다시 업무 머신이 다운되면 어떻게 되는지 묻습니다. ? 다운되면 예약된 작업을 실행할 수 없고 기간을 갱신할 수 없으며 당연히 30초 후에 잠금이 해제됩니다.

Redis 분산 잠금의 5가지 함정

Redis 분산 잠금을 갱신하는 방법 1. 잠금이 해제되지 않습니다.

이 상황은 제가 위에서 했던 실수인 저수준 실수입니다. 현재 스레드로 인해 Redis 잠금을 획득한 후 비즈니스 처리 후 제때에 잠금이 해제되지 않아 다른 스레드가 계속 잠금을 획득하려고 시도하게 됩니다. 예를 들어 Jedis 클라이언트를 사용하면 다음 오류 메시지가 보고됩니다.

redis.clients.jedis.Exceptions.JedisConnectionException: Could not get a resources from the pool

Redis 스레드 풀에는 처리할 여유 스레드가 없습니다. 클라이언트 명령.

해결 방법도 매우 간단합니다. 잠금을 얻은 스레드는 비즈니스를 처리한 후 시간 내에 잠금을 해제하지만 잠금을 얻은 후에는 잠금을 해제할 수 있습니다. 현재 연결을 끊고 일정 시간 동안 절전 모드로 전환합니다.

public void lock() {
    while (true) {
        boolean flag = this.getLock(key);
        if (flag) {
              TODO .........
        } else {
              // 释放当前redis连接
              redis.close();
              // 休眠1000毫秒
             sleep(1000);
       }
     }
 }

2. B의 잠금이 A에 의해 해제되었습니다

우리는 Redis의 잠금 구현 원리가 SETNX 명령에 있다는 것을 알고 있습니다. 키가 없으면 키 값이 value로 설정되고 반환 값은 1입니다. 지정된 키가 이미 있으면 SETNX는 어떤 작업도 수행하지 않고 반환 값은 0입니다.

SETNX key value
두 스레드 A와 B가 myLock 키를 잠그려고 시도합니다. 스레드 A가 먼저 잠금을 획득하고(잠금이 3초 후에 만료되는 경우) 지금까지 스레드 B가 잠금을 획득하려고 기다리고 있습니다. 전혀 문제가 없습니다.

이때 비즈니스 로직에 시간이 많이 걸리고 실행 시간이 redis 잠금 만료 시간을 초과한 경우 스레드 A의 잠금이 자동으로 해제되고(키가 삭제됨) 스레드 B는 myLock 키가 수행하는 것을 감지합니다. 존재하지 않으며 SETNX 명령을 실행하여 잠금을 얻습니다.

그러나 스레드 A가 비즈니스 로직을 완료하더라도 잠금은 여전히 ​​해제되므로(즉, 키가 삭제됨) 스레드 B의 잠금도 스레드 A에 의해 해제됩니다.

위 상황을 방지하려면 일반적으로 잠금 시 각 스레드를 식별하기 위해 고유한 값을 가져와 지정된 값으로만 ​​키를 놓아야 합니다. 그렇지 않으면 혼란스러운 잠금 해제 장면이 발생하게 됩니다.

三、数据库事务超时

emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:

 @Transaction
 public void lock() {
      while (true) {
          boolean flag = this.getLock(key);
          if (flag) {
              insert();
          }
      }
 }

给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。

比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。

一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。

一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。

  @Autowired
  DataSourceTransactionManager dataSourceTransactionManager;
  @Transaction
  public void lock() {
      //手动开启事务
      TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
      try {
          while (true) {
             boolean flag = this.getLock(key);
             if (flag) {
                 insert();
                 //手动提交事务
                 dataSourceTransactionManager.commit(transactionStatus);
             }
         }
     } catch (Exception e) {
         //手动回滚事务
         dataSourceTransactionManager.rollback(transactionStatus);
     }
 }

四、锁过期了,业务还没执行完

这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。

同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,把redis锁的过期时间再弄长点不就解决了吗?

那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。

要是redis锁的过期时间能够自动续期就好了。

为了解决这个问题我们使用redis客户端redisson,redisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。

redisson对分布式锁做了很好封装,只需调用API即可。

RLock lock = redissonClient.getLock("stockLock");

redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。

举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。

通过分析下边redisson的源码实现可以发现,不管是加锁、解锁、续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。

@Slf4j
@Service
public class RedisDistributionLockPlus {
   /**
    * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
    */
   private static final long DEFAULT_LOCK_TIMEOUT = 30;
  private static final long TIME_SECONDS_FIVE = 5 ;
  /**
   * 每个key的过期时间 {@link LockContent}
   */
  private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
  /**
   * redis执行成功的返回
   */
  private static final Long EXEC_SUCCESS = 1L;
  /**
   * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
   */
  private static final String LOCK_SCRIPT = "if redis.call(&#39;exists&#39;, KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call(&#39;get&#39;, KEYS[2]) + 10) end " +
          "if redis.call(&#39;exists&#39;, KEYS[1]) == 0 then " +
             "local t = redis.call(&#39;set&#39;, KEYS[1], ARGV[1], &#39;EX&#39;, ARGV[2]) " +
             "for k, v in pairs(t) do " +
               "if v == &#39;OK&#39; then return tonumber(ARGV[2]) end " +
             "end " +
          "return 0 end";
  /**
   * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
   */
  private static final String UNLOCK_SCRIPT = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then " +
          "local ctime = tonumber(ARGV[2]) " +
          "local biz_timeout = tonumber(ARGV[3]) " +
          "if ctime > 0 then  " +
             "if redis.call(&#39;exists&#39;, KEYS[2]) == 1 then " +
                 "local avg_time = redis.call(&#39;get&#39;, KEYS[2]) " +
                 "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                 "if avg_time >= biz_timeout - 5 then redis.call(&#39;set&#39;, KEYS[2], avg_time, &#39;EX&#39;, 24*60*60) " +
                 "else redis.call(&#39;del&#39;, KEYS[2]) end " +
             "elseif ctime > biz_timeout -5 then redis.call(&#39;set&#39;, KEYS[2], ARGV[2], &#39;EX&#39;, 24*60*60) end " +
          "end " +
          "return redis.call(&#39;del&#39;, KEYS[1]) " +
          "else return 0 end";
  /**
   * 续约lua脚本
   */
  private static final String RENEW_SCRIPT = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then return redis.call(&#39;expire&#39;, KEYS[1], ARGV[2]) else return 0 end";
  private final StringRedisTemplate redisTemplate;
  public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
      this.redisTemplate = redisTemplate;
      ScheduleTask task = new ScheduleTask(this, lockContentMap);
      // 启动定时任务
      ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
  }
  /**
   * 加锁
   * 取到锁加锁,取不到锁一直等待知道获得锁
   *
   * @param lockKey
   * @param requestId 全局唯一
   * @param expire   锁过期时间, 单位秒
   * @return
   */
  public boolean lock(String lockKey, String requestId, long expire) {
      log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
      for (; ; ) {
          // 判断是否已经有线程持有锁,减少redis的压力
          LockContent lockContentOld = lockContentMap.get(lockKey);
          boolean unLocked = null == lockContentOld;
          // 如果没有被锁,就获取锁
          if (unLocked) {
              long startTime = System.currentTimeMillis();
              // 计算超时时间
              long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
              String lockKeyRenew = lockKey + "_renew";
              RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
              List<String> keys = new ArrayList<>();
              keys.add(lockKey);
              keys.add(lockKeyRenew);
              Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
              if (null != lockExpire && lockExpire > 0) {
                  // 将锁放入map
                  LockContent lockContent = new LockContent();
                  lockContent.setStartTime(startTime);
                  lockContent.setLockExpire(lockExpire);
                  lockContent.setExpireTime(startTime + lockExpire * 1000);
                  lockContent.setRequestId(requestId);
                  lockContent.setThread(Thread.currentThread());
                  lockContent.setBizExpire(bizExpire);
                 lockContent.setLockCount(1);
                 lockContentMap.put(lockKey, lockContent);
                 log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
                 return true;
             }
         }
         // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
         if (Thread.currentThread() == lockContentOld.getThread()
                   && requestId.equals(lockContentOld.getRequestId())){
             // 计数 +1
             lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
             return true;
         }
         // 如果被锁或获取锁失败,则等待100毫秒
         try {
             TimeUnit.MILLISECONDS.sleep(100);
         } catch (InterruptedException e) {
             // 这里用lombok 有问题
             log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
             return false;
         }
     }
 }
 /**
  * 解锁
  *
  * @param lockKey
  * @param lockValue
  */
 public boolean unlock(String lockKey, String lockValue) {
     String lockKeyRenew = lockKey + "_renew";
     LockContent lockContent = lockContentMap.get(lockKey);
     long consumeTime;
     if (null == lockContent) {
         consumeTime = 0L;
     } else if (lockValue.equals(lockContent.getRequestId())) {
         int lockCount = lockContent.getLockCount();
         // 每次释放锁, 计数 -1,减到0时删除redis上的key
         if (--lockCount > 0) {
             lockContent.setLockCount(lockCount);
             return false;
         }
         consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
     } else {
         log.info("释放锁失败,不是自己的锁。");
         return false;
     }
     // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
     lockContentMap.remove(lockKey);
     RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
     List<String> keys = new ArrayList<>();
     keys.add(lockKey);
     keys.add(lockKeyRenew);
     Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
             Long.toString(lockContent.getBizExpire()));
     return EXEC_SUCCESS.equals(result);
 }
 /**
  * 续约
  *
  * @param lockKey
  * @param lockContent
  * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
  */
 public boolean renew(String lockKey, LockContent lockContent) {
     // 检测执行业务线程的状态
     Thread.State state = lockContent.getThread().getState();
     if (Thread.State.TERMINATED == state) {
         log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
         return false;
     }
     String requestId = lockContent.getRequestId();
     long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
     RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
     List<String> keys = new ArrayList<>();
     keys.add(lockKey);
     Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
     log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
     return EXEC_SUCCESS.equals(result);
 }
 static class ScheduleExecutor {
     public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
         long delay = unit.toMillis(initialDelay);
         long period_ = unit.toMillis(period);
         // 定时执行
         new Timer("Lock-Renew-Task").schedule(task, delay, period_);
     }
 }
 static class ScheduleTask extends TimerTask {
     private final RedisDistributionLockPlus redisDistributionLock;
     private final Map<String, LockContent> lockContentMap;
     public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
         this.redisDistributionLock = redisDistributionLock;
         this.lockContentMap = lockContentMap;
     }
     @Override
     public void run() {
         if (lockContentMap.isEmpty()) {
             return;
         }
         Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
         for (Map.Entry<String, LockContent> entry : entries) {
             String lockKey = entry.getKey();
             LockContent lockContent = entry.getValue();
             long expireTime = lockContent.getExpireTime();
             // 减少线程池中任务数量
             if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                 //线程池异步续约
                 ThreadPool.submit(() -> {
                     boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                     if (renew) {
                         long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                         lockContent.setExpireTime(expireTimeNew);
                     } else {
                         // 续约失败,说明已经执行完 OR redis 出现问题
                         lockContentMap.remove(lockKey);
         
           }
                 });
             }
         }
     }
 }
}

五、redis主从复制的坑

redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。

redis cluster集群环境下,假如现在A客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。

如果此时redis master节点宕机,为保证集群可用性,会进行主备切换,slave变为了redis master。A客户端错误地认为它在旧的master节点上成功加锁,但实际上锁已经被B客户端在新的master节点上加上了。

此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。

至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。

小结一下:上面就是我在使用Redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。

위 내용은 Redis 분산 잠금을 갱신하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제