ホームページ  >  記事  >  データベース  >  分散システムにおける Redis ベースの分散ロックについて話しましょう

分散システムにおける Redis ベースの分散ロックについて話しましょう

青灯夜游
青灯夜游転載
2021-10-29 10:52:111478ブラウズ

ロックされていますが、同時実行の問題はありますか? Redis分散ロックを本当に理解していますか?次の記事では、分散システムにおける Redis ベースの分散ロックについて説明します。

分散システムにおける Redis ベースの分散ロックについて話しましょう

#新しく引き継いだプロジェクトでは、アカウントが不均等であるという問題が発生することがあります。退職前の前任技術上司の説明は、「調査した結果、原因が見つからなかった。その後、忙しくて解決できなかった。フレームワークのせいかもしれない…」でした。 #プロジェクトが納品された今、このような問題を解決する必要があります。すべての会計処理ロジックを整理した結果、最終的にその原因がわかりました。ホット アカウントでのデータベースの同時操作が原因でした。この問題に関して、分散システムにおける Redis に基づく分散ロックについて話しましょう。ちなみに、問題の原因と解決策も詳しく説明します。 [関連する推奨事項:

Redis ビデオ チュートリアル

]

原因分析

システムの同時実行性は高くなく、ホット アカウントが存在しますが、そこまで深刻ではありません。問題の根本は、人為的に同時実行性を生み出すシステム アーキテクチャ設計にあります。シナリオは次のとおりです。販売者はデータのバッチをバッチでインポートし、システムは前処理を実行してアカウント残高を増減します。

この時点で、スケジュールされた別のタスクもアカウントをスキャンして更新します。さらに、同じアカウントに対する操作がさまざまなシステムに分散され、ホットアカウントが出現します。

この問題をアーキテクチャ レベルで解決するには、会計システムを切り離し、1 つのシステムに集中して処理することを検討できます。すべてのデータベース トランザクションと実行シーケンスは会計システムによって調整され、処理されます。技術的な観点から見ると、ホットスポット アカウントはロック メカニズムを通じてロックできます。

この記事では、ホット アカウントの分散ロックの実装について詳しく説明します。

ロックの分析

Java のマルチスレッド環境では、通常、使用できるロックのタイプがいくつかあります。

JVMメモリ モデル レベルのロック。一般的に使用されるロックには、同期ロックなど;
  • データベース ロック (楽観的ロック、悲観的ロックなど);
  • 分散ロック;
  • JVM メモリ レベル ロックは、複数のスレッドがグローバル変数にアクセス/変更する場合など、単一サービスの下でスレッド セキュリティを確保できます。ただし、システムがクラスターにデプロイされている場合、JVM レベルのローカル ロックは無力です。

悲観的ロックと楽観的ロック

上記の場合と同様、ホット アカウントは分散システム内の共有リソースであり、通常は

データベース ロック#を使用します。 ## または

分散ロック を使用して問題を解決します。 データベース ロックは、楽観的ロック

悲観的ロックに分割されます。 悲観的ロック

は、データベース (Mysql の InnoDB) によって提供される排他的ロックに基づいて実装されます。 select ... for update ステートメントを通じてトランザクション操作を実行するとき、MySQL はクエリ結果セット内のデータの各行に排他的ロックを追加し、他のスレッドはレコードの更新および削除操作をブロックします。共有リソースの順次実行 (変更) を実現するために、

オプティミスティック ロック

は悲観的ロックに対して相対的なものです。オプティミスティック ロックは、通常、データが競合を引き起こさないことを前提としているため、データは送信および更新されて初めて、データの競合が正式に検出されます。競合がある場合、例外情報がユーザーに返され、ユーザーが何をすべきかを決定できるようになります。オプティミスティック ロックは、読み取りが多く書き込みが少ないシナリオに適しており、プログラムのスループットを向上させることができます。楽観的ロックは通常、記録ステータスまたはバージョンの追加に基づいて実装されます。

悲観的ロックの失敗シナリオ

プロジェクトで悲観的ロックが使用されましたが、悲観的ロックは失敗しました。これは悲観的ロックを使用するときによくある誤解でもあるので、以下で分析してみましょう。

悲観的ロック プロセスの通常の使用:

更新のために選択...を通じてレコードをロックします;

    新しい残高を計算し、金額を変更して保存します。 ;
  • 実行完了後にロックを解除する;
  • よくあるミスの処理手順:

口座残高を照会し、新しい残高を計算する;

    更新ロック レコードの選択 .. . を通じて;
  • 量を変更して保存します;
  • 実行完了後にロックを解放します;
  • #間違ったプロセス (A サービスと B サービスによってクエリされる内容など) 残高はすべて 100、A は 50 を差し引き、B は 40 を差し引き、A はレコードをロックしてデータベースを 50 に更新します。A がロックを解放した後、Bレコードをロックし、データベースを 60 に更新します。明らかに、後者は前者の更新を上書きしています。解決策は、ロックの範囲を拡張し、新しい残高を計算する前にロックを進めることです。
  • 通常、悲観的ロックはデータベースに多大な負荷をかけますが、実際にはシナリオに応じて楽観的ロックまたは分散ロックが使用されます。

本題に入り、Redis に基づく分散ロックの実装について話しましょう。

Redis 分散ロックの実践演習

ここでは、Spring Boot、Redis、Lua スクリプトを例として、分散ロックの実装を示します。処理を簡略化するために、例の Redis は分散ロックの機能とデータベースの機能の両方を想定しています。

シナリオ構築

クラスタ環境で同一アカウントの金額を操作する基本手順:

  • データベースからユーザー金額を読み込み、同一アカウントの金額を操作します。
  • プログラムは金額を変更します;
  • その後、最新の金額がデータベースに保存されます;

以下はロックせず、最初から同期処理します。徐々に最終的な分布ロックを推測します。

基本的な統合とクラス構築

ロックのない基本的なビジネス環境を準備します。

まず、関連する依存関係を Spring Boot プロジェクトに導入します:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

アカウントに対応するエンティティ クラス UserAccount:

public class UserAccount {

  //用户ID
  private String userId;
  //账户内金额
  private int amount;

  //添加账户金额
  public void addAmount(int amount) {
    this.amount = this.amount + amount;
  }
  // 省略构造方法和getter/setter 
}

スレッド実装クラス AccountOperationThread を作成します:

public class AccountOperationThread implements Runnable {

  private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);

  private static final Long RELEASE_SUCCESS = 1L;

  private String userId;

  private RedisTemplate<Object, Object> redisTemplate;

  public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
    this.userId = userId;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    noLock();
  }

  /**
   * 不加锁
   */
  private void noLock() {
    try {
      Random random = new Random();
      // 模拟线程进行业务处理
      TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //模拟数据库中获取用户账号
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
    // 金额+1
    userAccount.addAmount(1);
    logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
    //模拟存回数据库
    redisTemplate.opsForValue().set(userId, userAccount);
  }
}

RedisTemplate のインスタンス化が Spring Boot に渡されます:

@Configuration
public class RedisConfig {

  @Bean
  public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(redisConnectionFactory);
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
        new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    // 设置value的序列化规则和 key的序列化规则
    redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.afterPropertiesSet();
    return redisTemplate;
  }
}

最後に、マルチスレッド操作をトリガーする TestController を準備します:

@RestController
public class TestController {

  private final static Logger logger = LoggerFactory.getLogger(TestController.class);

  private static ExecutorService executorService = Executors.newFixedThreadPool(10);

  @Autowired
  private RedisTemplate<Object, Object> redisTemplate;

  @GetMapping("/test")
  public String test() throws InterruptedException {
    // 初始化用户user_001到Redis,账户金额为0
    redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
    // 开启10个线程进行同步测试,每个线程为账户增加1元
    for (int i = 0; i < 10; i++) {
      logger.info("创建线程i=" + i);
      executorService.execute(new AccountOperationThread("user_001", redisTemplate));
    }

    // 主线程休眠1秒等待线程跑完
    TimeUnit.MILLISECONDS.sleep(1000);
    // 查询Redis中的user_001账户
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
    logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
    return "success";
  }
}

上記のプログラムを実行します (通常は 10 スレッド)。各スレッドは追加します。 1 の場合、結果は 10 になるはずです。しかし、何度か実行すると、結果は大きく異なり、基本的には 10 未満であることがわかります。

[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread  : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController      : user id : user_001 amount : 5

上記のログを例にとると、最初の 4 つのスレッドはすべて値を 1 に変更しました。これは、次の 3 つのスレッドが以前の変更を上書きし、最終結果が 10 ではなく 5 だけになったことを意味します。 。これには明らかに問題があります。

Redis 同期ロックの実装

上記の状況を考慮すると、同じ JVM 内でスレッド ロックを通じて完了できます。ただし、分散環境では JVM レベルのロックを実装できませんが、ここでは Redis 同期ロックを使用できます。

基本的な考え方: 最初のスレッドが入ったとき、Redis にレコードが入ります。後続のスレッドがリクエストしてきたときに、そのレコードが Redis に存在するかどうかを判断します。存在する場合は、そのレコードが存在することを意味します。ロック状態になり、待機または戻ります。存在しない場合は、後続の業務処理が行われます。

  /**
   * 1.抢占资源时判断是否被锁。
   * 2.如未锁则抢占成功且加锁,否则等待锁释放。
   * 3.业务完成后释放锁,让给其它线程。
   * <p>
   * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。
   */
  private void redisLock() {
    Random random = new Random();
    try {
      TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    while (true) {
      Object lock = redisTemplate.opsForValue().get(userId + ":syn");
      if (lock == null) {
        // 获得锁 -> 加锁 -> 跳出循环
        logger.info(Thread.currentThread().getName() + ":获得锁");
        redisTemplate.opsForValue().set(userId + ":syn", "lock");
        break;
      }
      try {
        // 等待500毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }

while コード ブロックでは、まず対応するユーザー ID が Redis に存在するかどうかを確認し、存在しない場合はセット ロックを実行し、存在する場合はループを抜けて待機を続けます。

上記のコードはロック機能を実装しているように見えますが、プログラムを実行すると、ロックされていない場合と同様に同時実行の問題が依然として存在することがわかります。その理由は、取得とロックの操作がアトミックではないためです。たとえば、2 つのスレッドがロックが両方とも null であることを検出してロックするとしますが、この時点でも同時実行の問題は依然として存在します。

Redis アトミック同期ロック

上記の問題に対処するために、取得とロックのプロセスをアトミック化できます。これは、spring-boot-data-redis によって提供されるアトマイゼーション API に基づいて実装できます:

// 该方法使用了redis的指令:SETNX key value
// 1.key不存在,设置成功返回value,setIfAbsent返回true;
// 2.key存在,则设置失败返回null,setIfAbsent返回false;
// 3.原子性操作;
Boolean setIfAbsent(K var1, V var2);

上記のメソッドのアトマイゼーション操作は、Redis の setnx コマンドのカプセル化です。Redis での setnx の使用は次のとおりです。次のように:

redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"

mykey を初めて設定するときは、mykey が存在しないため、設定が成功したことを示す 1 が返されます。2 回目に mykey が設定されると、mykey はすでに存在し、0 が返されます。が返され、設定が失敗したことが示されます。 mykey に対応する値を再度クエリすると、それがまだ最初に設定された値であることがわかります。言い換えれば、redis の setnx は、一意のキーが 1 つのサービスによってのみ正常に設定されることを保証します。

上記の API と基礎となる原則を理解した後、次のようにスレッド内の実装メソッド コードを見てみましょう:

  /**
   * 1.原子操作加锁
   * 2.竞争线程循环重试获得锁
   * 3.业务完成释放锁
   */
  private void atomicityRedisLock() {
    //Spring data redis 支持的原子性操作
    while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
      try {
        // 等待100毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    logger.info(Thread.currentThread().getName() + ":获得锁");
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }

コードを再度実行すると、結果が正しいことがわかります。これは、配布が正常に完了できることを意味します。スレッドはロックされています。

Redis 分散ロックのデッドロック

上記のコードの実行結果は問題ありませんが、アプリケーションが異常終了すると、メソッドを実行する時間がなくなります。最終的にロックを解放すると、他のスレッドはこのロックを取得できなくなります。

この時点で、オーバーロードされた setIfAbsent メソッドを使用できます:

Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);

このメソッドに基づいて、ロックの有効期限を設定できます。これにより、ロックを取得したスレッドがダウンした場合でも、Redis 内のデータの有効期限が切れた後は、他のスレッドが正常にロックを取得できます。

サンプルコードは以下の通りです。

private void atomicityAndExRedisLock() {
    try {
      //Spring data redis 支持的原子性操作,并设置5秒过期时间
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
          System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁--------");
      // 应用在这里宕机,进程退出,无法执行 finally;
      Thread.currentThread().interrupt();
      // 业务逻辑...
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁
      if (!Thread.currentThread().isInterrupted()) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁");
      }
    }
  }

ビジネスタイムアウトとデーモンスレッド

上にRedisのタイムアウト時間を追加することで解決するようです問題は解決しましたが、新たな問題が発生しました。

たとえば、通常の状況では、スレッド A は 5 秒以内にビジネスを完了できますが、場合によっては 5 秒以上かかることがあります。タイムアウトが 5 秒に設定されている場合、スレッド A がロックを取得しますが、ビジネス ロジックの処理には 6 秒かかります。この時点では、スレッド A はまだ通常のビジネス ロジックを実行しており、スレッド B はロックを取得しています。スレッド A の処理が終了すると、スレッド B のロックを解放できます。

上記のシナリオには 2 つの問題があります。

  • まず、スレッド A とスレッド B が同時に実行される可能性があり、同時実行の問題が発生します。
  • 2 番目に、スレッド A がスレッド B のロックを解放し、一連の悪循環を引き起こす可能性があります。

もちろん、Redis に値を設定することで、ロックがスレッド A に属するかスレッド B に属するかを判断できます。ただし、注意深く分析すると、この問題の本質は、スレッド A がビジネス ロジックの実行にロック タイムアウトよりも長い時間がかかることであることがわかります。

その場合、解決策は 2 つあります。

  • まず、ロックが解放される前にビジネス コードが実行できるように、タイムアウトを十分な長さに設定します。
  • 2 番目、ロックのデーモン スレッドを追加し、有効期限が近づいているが解放されていないロックの時間を追加します。

最初の方法では、ほとんどの場合、銀行全体の時間のかかるビジネス ロジックが必要です。タイムアウト設定。

2 番目の方法は、次のデーモン スレッド メソッドを通じてロック タイムアウトを動的に増加することです。

public class DaemonThread implements Runnable {
  private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);

  // 是否需要守护 主线程关闭则结束守护线程
  private volatile boolean daemon = true;
  // 守护锁
  private String lockKey;

  private RedisTemplate<Object, Object> redisTemplate;

  public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
    this.lockKey = lockKey;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    try {
      while (daemon) {
        long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
        // 剩余有效期小于1秒则续命
        if (time < 1000) {
          logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒");
          redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
        }
        TimeUnit.MILLISECONDS.sleep(300);
      }
      logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  // 主线程主动调用结束
  public void stop() {
    daemon = false;
  }
}

上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。

主线程中相关代码实现:

private void deamonRedisLock() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
      String result = (String) redisTemplate.opsForValue().get(userId + ":syn");
      if (value.equals(result)) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁-----");
      }
      //关闭守护线程
      if (daemonThread != null) {
        daemonThread.stop();
      }
    }
  }

其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。

基于Lua脚本的实现

在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。

首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript对象,在RedisConfig配置类中添加如下实例化代码:

@Configuration
public class RedisConfig {

  //lock script
  private static final String LOCK_SCRIPT = " if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 " +
      " then redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) " +
      " return 1 " +
      " else return 0 end ";
  private static final String UNLOCK_SCRIPT = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then return redis.call" +
      "(&#39;del&#39;, KEYS[1]) else return 0 end";

  // ... 省略部分代码
  
  @Bean
  public DefaultRedisScript<Boolean> lockRedisScript() {
    DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Boolean.class);
    defaultRedisScript.setScriptText(LOCK_SCRIPT);
    return defaultRedisScript;
  }

  @Bean
  public DefaultRedisScript<Long> unlockRedisScript() {
    DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Long.class);
    defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
    return defaultRedisScript;
  }
}

再通过在AccountOperationThread类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate来调用了,改造之后的代码实现如下:

  private void deamonRedisLockWithLua() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
        // 等待1000毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      logger.error("异常", e);
    } finally {
      //使用Lua脚本:先判断是否是自己设置的锁,再执行删除
      // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0;
      Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
      logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result));
      if (RELEASE_SUCCESS.equals(result)) {
        if (daemonThread != null) {
          //关闭守护线程
          daemonThread.stop();
          logger.info(Thread.currentThread().getName() + ":释放锁---");
        }
      }
    }
  }

其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。

Redis锁的其他因素

除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。

Redis锁的不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。

可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。

等待锁释放

有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:

  • 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
  • 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。

集群中的主备切换和脑裂

在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。

当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。

小结

通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。

同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。

Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。

源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock

更多编程相关知识,请访问:编程入门!!

以上が分散システムにおける Redis ベースの分散ロックについて話しましょうの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.cnで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。