>데이터 베이스 >Redis >분산 시스템의 Redis 기반 분산 잠금에 대해 이야기해 보겠습니다.

분산 시스템의 Redis 기반 분산 잠금에 대해 이야기해 보겠습니다.

青灯夜游
青灯夜游앞으로
2021-10-29 10:52:111612검색

잠겼는데 여전히 동시성 문제가 있나요? Redis분산 잠금을 정말로 이해하고 계신가요? 다음 기사에서는 분산 시스템의 Redis 기반 분산 잠금에 대해 설명하겠습니다. 도움이 되기를 바랍니다.

분산 시스템의 Redis 기반 분산 잠금에 대해 이야기해 보겠습니다.

새로 인수한 프로젝트에서는 계정이 고르지 못한 문제가 가끔 발생합니다. 떠나기 전 전 기술사장이 해준 설명은: 문제 해결 후에도 원인을 찾을 수 없었습니다. 그 후 너무 바빠서 프레임워크 때문일 수도 있습니다...

이제 프로젝트가 끝났습니다. 전달되었으므로 이러한 문제를 해결해야 합니다. 모든 회계 처리 로직을 정리한 후 마침내 원인을 찾았습니다. 이는 핫 계정에 대한 동시 데이터베이스 작업으로 인해 발생했습니다. 이 문제와 관련하여 분산 시스템의 Redis 기반 분산 잠금에 대해 이야기해 보겠습니다. 그런데 문제의 원인과 해결 방법도 분석합니다. [관련 추천 : Redis 영상 튜토리얼]

원인 분석

시스템 동시성도 높지 않고, 핫한 계정도 있지만 그렇게 심각하지는 않습니다. 문제의 근본 원인은 동시성을 인위적으로 생성하는 시스템 아키텍처 설계에 있습니다. 시나리오는 다음과 같습니다. 판매자가 일괄적으로 데이터를 가져오면 시스템이 전처리를 수행하고 계정 잔액을 늘리거나 줄입니다.

이때 또 다른 예약된 작업도 계정을 스캔하고 업데이트합니다. 또한, 동일한 계정의 작업이 다양한 시스템에 분산되어 핫 계정이 나타납니다.

이 문제를 해결하기 위해 아키텍처 수준에서 회계 시스템을 분리하고 이를 하나의 시스템에 중앙 집중화하여 처리하는 것을 고려할 수 있습니다. 모든 데이터베이스 트랜잭션과 실행 순서는 회계 시스템에 의해 조정되고 처리됩니다. 기술적인 관점에서 핫스팟 계정은 잠금 메커니즘을 통해 잠글 수 있습니다.

이 글에서는 핫 계정에 대한 분산 잠금 구현에 대해 자세히 설명합니다.

잠금 분석

Java의 다중 스레드 환경에는 일반적으로 사용할 수 있는 여러 유형의 잠금이 있습니다.

  • JVM 메모리 모델 수준 잠금, 일반적으로 사용되는 잠금은 동기화, 잠금 등입니다.
  • 낙관적 잠금, 비관적 잠금 등과 같은 데이터베이스 잠금
  • 분산 잠금
JVM 메모리 수준 잠금은 전역 변수에 액세스/수정하는 여러 스레드와 같은 단일 서비스에서 스레드의 보안을 보장할 수 있습니다. 그러나 시스템이 클러스터에 배포되면 JVM 수준 로컬 잠금은 무력화됩니다.

비관적 잠금 및 낙관적 잠금

위의 경우처럼 핫스팟 계정은 분산 시스템에서 공유 리소스로 이를 해결하기 위해 일반적으로

Database Lock 또는 Distributed Lock을 사용합니다.

데이터베이스 잠금은

낙관적 잠금비관적 잠금으로 구분됩니다.

비관적 잠금은 데이터베이스(Mysql의 InnoDB)에서 제공하는 단독 잠금을 기반으로 구현되었습니다. 트랜잭션 작업을 수행할 때 select...for update 문을 통해 MySQL은 쿼리 결과 집합의 각 데이터 행에 배타적 잠금을 추가하고 다른 스레드는 레코드의 업데이트 및 삭제 작업을 차단합니다. 공유 리소스의 순차적 실행(수정)을 달성하기 위해

낙관적 잠금은 비관적 잠금과 관련이 있습니다. 낙관적 잠금은 일반적으로 데이터가 충돌을 일으키지 않는다고 가정하므로 데이터가 제출되고 업데이트되었습니다. 충돌이 있는지 확인하세요. 충돌이 있는 경우 예외 정보가 사용자에게 반환되어 사용자가 무엇을 할지 결정할 수 있습니다. 낙관적 잠금은 읽기는 많고 쓰기는 적은 시나리오에 적합하여 프로그램 처리량을 향상시킬 수 있습니다. 낙관적 잠금은 일반적으로 기록 상태나 버전 추가에 따라 구현됩니다.

비관적 잠금 실패 시나리오

비관적 잠금이 프로젝트에 사용되었지만 비관적 잠금이 실패했습니다. 이는 비관적 잠금을 사용할 때 흔히 발생하는 오해이기도 합니다. 아래에서 분석해 보겠습니다.

비관적 잠금 프로세스의 일반적인 사용:

    업데이트를 위해 선택을 통해 레코드를 잠급니다.
  • 새 잔액을 계산하고 금액을 수정한 후 저장합니다.
  • 실행이 완료된 후 잠금을 해제합니다. 잦은 실수에 대한 처리 프로세스:
계정 잔액을 쿼리하고 새 잔액을 계산합니다.

업데이트를 위해 선택을 통해 기록을 잠급니다.
  • 금액을 수정하고 저장합니다.
  • 실행이 완료된 후 잠금을 해제합니다.
  • A, B 서비스 문의와 같은 잘못된 프로세스에서 도달한 잔액은 모두 100이고 A가 50을 차감하고 B가 40을 차감한 다음 A가 잠금을 해제한 후 데이터베이스를 50으로 업데이트합니다. 기록을 저장하고 데이터베이스를 60으로 업데이트합니다. 분명히 후자가 전자의 업데이트를 덮어썼습니다. 해결 방법은 잠금 범위를 확장하고 새 잔액을 계산하기 전에 잠금을 진행하는 것입니다.
  • 일반적으로 비관적 잠금은 데이터베이스에 많은 부담을 줍니다. 실제로는 시나리오에 따라 낙관적 잠금이나 분산 잠금을 사용하여 구현됩니다.

본론으로 가서 Redis 기반의 분산 잠금 구현에 대해 이야기해 보겠습니다.

Redis 분산 잠금 실습

여기에서는 분산 잠금 구현을 보여주기 위해 Spring Boot, Redis 및 Lua 스크립트를 예로 사용합니다. 처리를 단순화하기 위해 예제의 Redis는 분산 잠금 기능과 데이터베이스 기능을 모두 가정합니다.

시나리오 구성

클러스터 환경에서 기본 단계는 동일한 계정의 금액을 운영하는 것입니다.

  • 데이터베이스에서 사용자 금액을 읽습니다.
  • 프로그램이 금액을 수정합니다.
  • 그런 다음 최신 금액을 저장합니다.

다음은 잠금이 없는 비동기 처리로 시작하여 점차적으로 최종 분산 잠금을 추론합니다.

기본 통합 및 클래스 구축

잠김 없는 기본 비즈니스 환경을 준비합니다.

먼저 Spring Boot 프로젝트에 관련 종속성을 도입합니다.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

Account 해당 엔터티 클래스 UserAccount:

public class UserAccount {

  //用户ID
  private String userId;
  //账户内金额
  private int amount;

  //添加账户金额
  public void addAmount(int amount) {
    this.amount = this.amount + amount;
  }
  // 省略构造方法和getter/setter 
}

스레드 구현 클래스 생성 AccountOperationThread:

public class AccountOperationThread implements Runnable {

  private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);

  private static final Long RELEASE_SUCCESS = 1L;

  private String userId;

  private RedisTemplate<Object, Object> redisTemplate;

  public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
    this.userId = userId;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    noLock();
  }

  /**
   * 不加锁
   */
  private void noLock() {
    try {
      Random random = new Random();
      // 模拟线程进行业务处理
      TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //模拟数据库中获取用户账号
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
    // 金额+1
    userAccount.addAmount(1);
    logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
    //模拟存回数据库
    redisTemplate.opsForValue().set(userId, userAccount);
  }
}

RedisTemplate의 인스턴스화는 Spring Boot로 전달됩니다.

@Configuration
public class RedisConfig {

  @Bean
  public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(redisConnectionFactory);
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
        new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    // 设置value的序列化规则和 key的序列化规则
    redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.afterPropertiesSet();
    return redisTemplate;
  }
}

마지막으로, 다중 스레드 작업을 트리거하기 위한 TestController 준비:

@RestController
public class TestController {

  private final static Logger logger = LoggerFactory.getLogger(TestController.class);

  private static ExecutorService executorService = Executors.newFixedThreadPool(10);

  @Autowired
  private RedisTemplate<Object, Object> redisTemplate;

  @GetMapping("/test")
  public String test() throws InterruptedException {
    // 初始化用户user_001到Redis,账户金额为0
    redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
    // 开启10个线程进行同步测试,每个线程为账户增加1元
    for (int i = 0; i < 10; i++) {
      logger.info("创建线程i=" + i);
      executorService.execute(new AccountOperationThread("user_001", redisTemplate));
    }

    // 主线程休眠1秒等待线程跑完
    TimeUnit.MILLISECONDS.sleep(1000);
    // 查询Redis中的user_001账户
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
    logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
    return "success";
  }
}

위 프로그램을 실행합니다. 일반적으로 10개의 스레드가 있고 각 스레드는 1을 추가하며 결과는 10이 되어야 합니다. 하지만 여러 번 실행해 보면 결과가 크게 달라지며, 기본적으로 10보다 작은 것을 알 수 있습니다.

[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread  : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController      : user id : user_001 amount : 5

위 로그를 예로 들면 처음 4개의 스레드가 모두 값을 1로 변경했습니다. 이는 다음 3개의 스레드가 이전 수정 사항을 덮어썼다는 의미이며 최종 결과는 10이 아니라 5에 불과합니다. 이것은 분명히 문제가 있습니다.

Redis 동기화 잠금 구현

위의 상황을 고려하여 동일한 JVM에서는 스레드 잠금을 통해 완료할 수 있습니다. 그러나 분산 환경에서는 JVM 수준 잠금을 구현할 수 없습니다. 여기서는 Redis 동기화 잠금을 사용할 수 있습니다.

기본 아이디어: 첫 번째 스레드가 들어오면 Redis에 레코드가 들어갑니다. 후속 스레드가 요청하면 해당 레코드가 Redis에 있는지 여부를 판단하여 잠긴 상태로 대기한다는 의미입니다. 아니면 반품. 존재하지 않는 경우에는 후속 업무 처리를 실시합니다.

  /**
   * 1.抢占资源时判断是否被锁。
   * 2.如未锁则抢占成功且加锁,否则等待锁释放。
   * 3.业务完成后释放锁,让给其它线程。
   * <p>
   * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。
   */
  private void redisLock() {
    Random random = new Random();
    try {
      TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    while (true) {
      Object lock = redisTemplate.opsForValue().get(userId + ":syn");
      if (lock == null) {
        // 获得锁 -> 加锁 -> 跳出循环
        logger.info(Thread.currentThread().getName() + ":获得锁");
        redisTemplate.opsForValue().set(userId + ":syn", "lock");
        break;
      }
      try {
        // 等待500毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }

while 코드 블록에서 먼저 해당 사용자 ID가 Redis에 존재하는지 확인합니다. 존재하지 않으면 잠금을 설정하고 루프에서 빠져나와 계속 기다립니다.

위 코드는 잠금 기능을 구현한 것 같지만, 프로그램을 실행해 보면 여전히 잠겨 있지 않은 것처럼 동시성 문제가 발생하는 것을 발견할 수 있습니다. 그 이유는 획득 및 잠금 작업이 원자적이지 않기 때문입니다. 예를 들어 두 스레드가 잠금이 모두 null임을 확인하고 현재 동시성 문제가 여전히 존재합니다.

Redis 원자 동기화 잠금

위의 문제를 고려하여 획득 및 잠금 프로세스를 원자화할 수 있습니다. spring-boot-data-redis에서 제공하는 원자화 API를 기반으로 실현할 수 있습니다.

// 该方法使用了redis的指令:SETNX key value
// 1.key不存在,设置成功返回value,setIfAbsent返回true;
// 2.key存在,则设置失败返回null,setIfAbsent返回false;
// 3.原子性操作;
Boolean setIfAbsent(K var1, V var2);

위 방법의 원자적 연산은 Redis의 setnx 명령을 캡슐화한 것입니다. Redis에서 setnx를 사용하는 방법은 다음과 같습니다.

redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"

처음으로 mykey를 설정하면 존재하지 않고 1이 반환되어 설정이 성공했음을 나타내며 두 ​​번째로 mykey를 설정하면 이미 존재하며 0이 반환되어 이를 나타냅니다. 설정이 실패했습니다. mykey에 해당하는 값을 다시 쿼리해 보면 여전히 처음 설정한 값임을 알 수 있습니다. 즉, redis의 setnx는 고유 키가 하나의 서비스에서만 성공적으로 설정될 수 있도록 보장합니다.

위의 API와 기본 원리를 이해한 후 다음과 같이 스레드의 구현 메소드 코드를 살펴보겠습니다.

  /**
   * 1.原子操作加锁
   * 2.竞争线程循环重试获得锁
   * 3.业务完成释放锁
   */
  private void atomicityRedisLock() {
    //Spring data redis 支持的原子性操作
    while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
      try {
        // 等待100毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    logger.info(Thread.currentThread().getName() + ":获得锁");
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }

코드를 다시 실행하면 결과가 올바른 것을 알 수 있습니다. 이는 분산 스레드가 성공적으로 잠겼습니다.

Redis 분산 잠금의 교착 상태

위 코드의 실행 결과는 괜찮지만, 애플리케이션이 비정상적으로 충돌하여 최종적으로 잠금을 해제하는 메서드를 실행할 시간이 없으면 다른 스레드는 절대 불가능합니다. 자물쇠를 얻으려면.

이때 setIfAbsent의 오버로드된 메소드를 사용할 수 있습니다:

Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);

이 메소드를 기반으로 잠금 만료 시간을 설정할 수 있습니다. 이렇게 하면 잠금을 획득한 스레드가 다운되더라도 Redis의 데이터가 만료된 후 다른 스레드가 정상적으로 잠금을 획득할 수 있습니다.

샘플 코드는 다음과 같습니다.

private void atomicityAndExRedisLock() {
    try {
      //Spring data redis 支持的原子性操作,并设置5秒过期时间
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
          System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁--------");
      // 应用在这里宕机,进程退出,无法执行 finally;
      Thread.currentThread().interrupt();
      // 业务逻辑...
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁
      if (!Thread.currentThread().isInterrupted()) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁");
      }
    }
  }

비즈니스 타임아웃 및 데몬 스레드

위에 Redis의 타임아웃 기간을 추가했는데, 이는 문제를 해결하는 것처럼 보이지만 새로운 문제를 야기하기도 합니다.

예를 들어 일반적인 상황에서는 스레드 A가 5초 이내에 업무를 완료할 수 있지만 경우에 따라 5초 이상이 걸릴 수도 있습니다. 제한 시간을 5초로 설정하면 스레드 A가 잠금을 획득하지만 비즈니스 로직 처리에는 6초가 걸립니다. 현재 스레드 A는 여전히 정상적인 비즈니스 논리를 수행하고 있으며 스레드 B는 잠금을 획득했습니다. 스레드 A의 처리가 완료되면 스레드 B의 잠금을 해제할 수 있습니다.

위 시나리오에는 두 가지 문제가 있습니다.

  • 첫째, 스레드 A와 스레드 B가 동시에 실행되어 동시성 문제가 발생할 수 있습니다.
  • 둘째, 스레드 A가 스레드 B의 잠금을 해제하여 일련의 악순환을 초래할 수 있습니다.

물론 Redis에서 값을 설정하여 잠금이 스레드 A에 속하는지 스레드 B에 속하는지 확인할 수 있습니다. 그러나 신중하게 분석하면 이 문제의 본질은 스레드 A가 잠금 시간 초과보다 비즈니스 논리를 실행하는 데 더 오랜 시간이 걸린다는 것입니다.

그런 다음 두 가지 해결 방법이 있습니다.

  • 첫째, 잠금이 해제되기 전에 비즈니스 코드가 실행될 수 있도록 충분히 긴 시간 초과를 설정합니다.
  • 두 번째, 잠금이 만료되는 것을 방지하기 위해 잠금용 데몬 스레드를 추가합니다. 잠금이 해제되었지만 해제되지 않은 경우 시간이 늘어납니다.

첫 번째 방법은 대부분의 경우 전체 은행의 시간이 많이 걸리는 비즈니스 로직이 필요하며 시간 초과가 설정됩니다.

두 번째 방법은 다음 데몬 스레드 방식을 통해 잠금 시간 초과를 동적으로 늘리는 것입니다.

public class DaemonThread implements Runnable {
  private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);

  // 是否需要守护 主线程关闭则结束守护线程
  private volatile boolean daemon = true;
  // 守护锁
  private String lockKey;

  private RedisTemplate<Object, Object> redisTemplate;

  public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
    this.lockKey = lockKey;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    try {
      while (daemon) {
        long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
        // 剩余有效期小于1秒则续命
        if (time < 1000) {
          logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒");
          redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
        }
        TimeUnit.MILLISECONDS.sleep(300);
      }
      logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  // 主线程主动调用结束
  public void stop() {
    daemon = false;
  }
}

上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。

主线程中相关代码实现:

private void deamonRedisLock() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
      String result = (String) redisTemplate.opsForValue().get(userId + ":syn");
      if (value.equals(result)) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁-----");
      }
      //关闭守护线程
      if (daemonThread != null) {
        daemonThread.stop();
      }
    }
  }

其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。

基于Lua脚本的实现

在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。

首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript对象,在RedisConfig配置类中添加如下实例化代码:

@Configuration
public class RedisConfig {

  //lock script
  private static final String LOCK_SCRIPT = " if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 " +
      " then redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) " +
      " return 1 " +
      " else return 0 end ";
  private static final String UNLOCK_SCRIPT = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then return redis.call" +
      "(&#39;del&#39;, KEYS[1]) else return 0 end";

  // ... 省略部分代码
  
  @Bean
  public DefaultRedisScript<Boolean> lockRedisScript() {
    DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Boolean.class);
    defaultRedisScript.setScriptText(LOCK_SCRIPT);
    return defaultRedisScript;
  }

  @Bean
  public DefaultRedisScript<Long> unlockRedisScript() {
    DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Long.class);
    defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
    return defaultRedisScript;
  }
}

再通过在AccountOperationThread类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate来调用了,改造之后的代码实现如下:

  private void deamonRedisLockWithLua() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
        // 等待1000毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      logger.error("异常", e);
    } finally {
      //使用Lua脚本:先判断是否是自己设置的锁,再执行删除
      // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0;
      Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
      logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result));
      if (RELEASE_SUCCESS.equals(result)) {
        if (daemonThread != null) {
          //关闭守护线程
          daemonThread.stop();
          logger.info(Thread.currentThread().getName() + ":释放锁---");
        }
      }
    }
  }

其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。

Redis锁的其他因素

除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。

Redis锁的不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。

可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。

等待锁释放

有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:

  • 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
  • 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。

集群中的主备切换和脑裂

在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。

当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。

小结

通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。

同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。

Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。

源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock

更多编程相关知识,请访问:编程入门!!

위 내용은 분산 시스템의 Redis 기반 분산 잠금에 대해 이야기해 보겠습니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제