ホームページ  >  記事  >  データベース  >  Redis を使用して SpringBoot に分散ロックを実装する方法

Redis を使用して SpringBoot に分散ロックを実装する方法

WBOY
WBOY転載
2023-06-03 08:16:321505ブラウズ

1. Redis が実装する分散ロックの原理

分散ロックが必要な理由

分散ロックについて話す前に、なぜ分散ロックが必要なのかを説明する必要があります。 分散ロック

分散ロック、スタンドアロン ロックと比較します。マルチスレッド プログラムを作成する場合、共有変数を同時に操作することによって引き起こされるデータの問題を回避します。通常はロックを使用して相互に排除し、確実なロックを実現します。共有変数.プロパティの正当性、その使用範囲は同じプロセス内です。共有リソースを同時に操作する必要があるプロセスが複数ある場合、どうすれば相互排他的になるのでしょうか?今日のビジネス アプリケーションは通常マイクロサービス アーキテクチャであり、これは 1 つのアプリケーションが複数のプロセスをデプロイすることも意味します。複数のプロセスが MySQL の同じレコード行を変更する必要がある場合、順序の乱れた操作によって引き起こされるダーティ データを避けるために、分散が必要です。今回導入するスタイルはロックされています。

Redis を使用して SpringBoot に分散ロックを実装する方法

#分散ロックを実装したい場合は、外部システムを使用する必要があります。すべてのプロセスはこのシステムにアクセスしてロックを申請します。この外部システムは相互に排他的である必要があります。つまり、2 つのリクエストが同時に到着した場合、システムは 1 つのプロセスのみを正常にロックし、もう 1 つのプロセスは失敗します。この外部システムにはデータベース、Redis、Zookeeper のいずれかを使用できますが、パフォーマンスを追求するために、通常は Redis または Zookeeper を使用することを選択します。

Redis は共有ストレージ システムとして使用でき、複数のクライアントがアクセスを共有できるため、分散ロックの保存に使用できます。さらに、Redis は高い読み取りおよび書き込みパフォーマンスを備えており、同時実行性の高いロック操作シナリオを処理できます。この記事の焦点は、Redis を使用して分散ロックを実装する方法を紹介し、実装プロセス中に発生する可能性のある問題について説明することです。

分散ロックの実装方法

分散ロックの実装における共有ストレージ システムとして、Redis はキーと値のペアを使用してロック変数を保存し、受信して処理できます。さまざまなクライアントから送信されるロックとロックの解放の操作リクエスト。では、キーと値のペアのキーと値はどのように決定されるのでしょうか?ロック変数に変数名を付け、この変数名をキーと値のペアのキーとして使用する必要があり、ロック変数の値はキーと値のペアの値になります。このようにして、Redis はロック変数とクライアントは、Redis コマンド操作を通じてロック操作を実装できます。

分散ロックを実装するには、Redis に相互排他機能が必要です。 SETNX コマンドを使用できます。これは SET IF NOT EXIST を意味します。つまり、キーが存在しない場合はその値が設定され、それ以外の場合は何も行われません。分散ロックは、2 つのクライアント プロセスにコマンドを相互に排他的に実行させることで実装されます。

以下は、キーと値のペアを使用してロック変数を保存する Redis の操作プロセスと、同時にロックを要求する 2 つのクライアントを示しています。

Redis を使用して SpringBoot に分散ロックを実装する方法

#ロック操作が完了すると、ロックに成功したクライアントは共有リソースを操作できるようになります。たとえば、MySQL の特定のデータ行を変更できます。操作が完了したら、後発者に共有リソースを操作する機会を与えるために、ロックを時間内に解放する必要があります。ロックを解除するにはどうすればよいですか?このキーを削除するには、DEL コマンドを使用するだけです。ロジックは非常にシンプルで、全体の処理を擬似コードで記述すると以下のようになります。

// 加锁
SETNX lock_key 1
// 业务逻辑
DO THINGS
// 释放锁
DEL lock_key

ただし、上記の実装には大きな問題があり、クライアント 1 がロックを取得したときに、次のような状況が発生するとデッドロックが発生します。

プログラムはビジネス ロジック例外を処理し、時間内にロックを解放できません。プロセスがハングし、ロックを解放する機会がありません。

上記の状況により、クライアントがlock を使用すると、ロックを永久に占有し、他のクライアントは決してそのロックを取得できなくなります。

デッドロックを回避する方法

上記のデッドロックの問題を解決するために考えられる最も簡単な解決策は、ロックを申請するときにそのロックにロックを設定することです。有効期限は、共有リソースの操作時間が 10 秒を超えないと仮定し、ロックする場合は、このキーの有効期限を 10 秒に設定するだけです。

しかし、上記の操作にはまだ問題があります。ロックして有効期限を設定するコマンドは 2 つあります。最初のコマンドだけが実行され、2 番目のコマンドは実行に失敗する可能性があります、例:

1. SETNX は正常に実行されましたが、ネットワークの問題により EXPIRE は失敗しました。
2. SETNX は正常に実行されましたが、Redis が異常にクラッシュし、EXPIRE にはチャンスがありませんでした
3. SETNX は正常に実行され、顧客は端末が異常にクラッシュし、EXPIRE を実行する機会がありませんでした

つまり、これら 2 つのコマンドがアトミックであることが保証できない場合は、操作を実行すると、有効期限の設定が失敗し、デッドロックの問題が依然として発生する可能性がある潜在的なリスクがあります。幸いなことに、Redis 2.6.12 以降、Redis は SET コマンドのパラメーターを拡張しました。SET と同時に EXPIRE 時間を指定できます。この操作はアトミックです。たとえば、次のコマンドはロックの有効期限を 10 秒に設定します。

SET lock_key 1 EX 10 NX

これまでのところ、デッドロックの問題は解決されましたが、他にも問題がまだ残っています。次のシナリオを想像してください:

Redis を使用して SpringBoot に分散ロックを実装する方法

    クライアント 1 は正常にロックされ、共有リソースの操作を開始します
  1. 客户端1操作共享资源耗时太久,超过了锁的过期时间,锁失效(锁被自动释放)

  2. 客户端2加锁成功,开始操作共享资源

  3. 客户端1操作共享资源完成,在finally块中手动释放锁,但此时它释放的是客户端2的锁。

这里存在两个严重的问题:

  • 锁过期

  • 释放了别人的锁

第1个问题是评估操作共享资源的时间不准确导致的,如果只是一味增大过期时间,只能缓解问题降低出现问题的概率,依旧无法彻底解决问题。原因在于客户端在拿到锁之后,在操作共享资源时,遇到的场景是很复杂的,既然是预估的时间,也只能是大致的计算,不可能覆盖所有导致耗时变长的场景

第二个问题在于解锁操作是不够严谨的,因为它是一种不加区分地释放锁的操作,没有对锁的所有权进行检查。如何解决呢?

锁被别人给释放了

解决办法是,客户端在加锁时,设置一个只有自己知道的唯一标识进去,例如可以是自己的线程ID,如果是redis实现,就是SET key unique_value EX 10 NX。之后在释放锁时,要先判断这把锁是否归自己持有,只有是自己的才能释放它。

//释放锁 比较unique_value是否相等,避免误释放
if redis.get("key") == unique_value then
    return redis.del("key")

这里释放锁使用的是GET + DEL两条命令,这时又会遇到原子性问题了。

  1. 客户端1执行GET,判断锁是自己的

  2. 客户端2执行了SET命令,强制获取到锁(虽然发生概念很低,但要严谨考虑锁的安全性)

  3. 客户端1执行DEL,却释放了客户端2的锁

由此可见,以上GET + DEL两个命令还是必须原子的执行才行。怎样原子执行两条命令呢?答案是Lua脚本,可以把以上逻辑写成Lua脚本,让Redis执行。因为Redis处理每个请求是单线程执行的,在执行一个Lua脚本时其它请求必须等待,直到这个Lua脚本处理完成,这样一来GET+DEL之间就不会有其他命令执行了。

以下是使用Lua脚本(unlock.script)实现的释放锁操作的伪代码,其中,KEYS[1]表示lock_key,ARGV[1]是当前客户端的唯一标识,这两个值都是我们在执行 Lua脚本时作为参数传入的。

//Lua脚本语言,释放锁 比较unique_value是否相等,避免误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

最后我们执行以下命令,即可

redis-cli  --eval  unlock.script lock_key , unique_value

这样一路优先下来,整个加锁、解锁流程就更严谨了,先小结一下,基于Redis实现的分布式锁,一个严谨的流程如下:

  1. 加锁时要设置过期时间SET lock_key unique_value EX expire_time NX

  2. 操作共享资源

  3. 释放锁:Lua脚本,先GET判断锁是否归属自己,再DEL释放锁

有了这个严谨的锁模型,我们还需要重新思考之前的那个问题,锁的过期时间不好评估怎么办。

如何确定锁的过期时间

前面提到过,过期时间如果评估得不好,这个锁就会有提前过期的风险,一种妥协的解决方案是,尽量冗余过期时间,降低锁提前过期的概率,但这个方案并不能完美解决问题。是否可以设置这样的方案,加锁时,先设置一个预估的过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间

Redisson是一个已封装好这些工作的库,可以说是一种非常优秀的解决方案。Redisson是一个Java语言实现的Redis SDK客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般叫它看门狗线程。这个SDK提供的API非常友好,它可以像操作本地锁一样操作分布式锁。客户端一旦加锁成功,就会启动一个watch dog看门狗线程,它是一个后台线程,会每隔一段时间(这段时间的长度与设置的锁的过期时间有关)检查一下,如果检查时客户端还持有锁key(也就是说还在操作共享资源),那么就会延长锁key的生存时间。

Redis を使用して SpringBoot に分散ロックを実装する方法

那如果客户端在加锁成功后就宕机了呢?宕机了那么看门狗任务就不存在了,也就无法为锁续期了,锁到期自动失效。

Redis的部署方式对锁的影响

上面讨论的情况,都是锁在单个Redis 实例中可能产生的问题,并没有涉及到Redis的部署架构细节。

Redis发展到现在,几种常见的部署架构有:

  • #シングル モード;

  • マスター/スレーブ モード;

  • センチネル モード;

  • #クラスター モード;
  • Redis を使用する場合、
通常、マスター/スレーブ クラスター センチネル モードでデプロイします。センチネルの役割は、 redis ノードの動作ステータス。通常のマスター/スレーブ モードでは、マスターがクラッシュした場合、手動でスレーブをマスターに切り替える必要がありますが、マスターとスレーブのセンチネルの組み合わせを使用する利点は、マスターが異常にクラッシュした場合にセンチネルが自動フェイルオーバーを実装し、スレーブを新しいマスターに昇格させます。可用性

はサービスを提供し続けることで保証されます。それでは、マスターとスレーブの切り替えが発生したとき、分散ロックは依然として安全なのでしょうか?

#次のシナリオを想像してください: Redis を使用して SpringBoot に分散ロックを実装する方法

クライアント 1 がマスター上で SET コマンドを実行し、ロックが成功します
  1. 現時点では、マスターは異常ダウンしており、SET コマンドはまだスレーブに同期されていません (マスター/スレーブ レプリケーションは非同期です)
  2. Sentinel はスレーブを新しいマスターに昇格させますが、新しいマスターでロックが失われたため、クライアント 2 は正常にロックされますが、分散ロックは依然として影響を受ける可能性があります。 Redis がセンチネルを通じて高可用性を確保したとしても、何らかの理由でマスター ノードがマスターとスレーブを切り替えると、ロックは失われます。
  3. クラスターモード Redlock は信頼性の高い分散ロックを実装します

Redis インスタンスの障害によるロック失敗の問題を回避するために、Redis 開発者 Antirez は分散フォーミュラ ロック アルゴリズム Redlock を提案しました。 。 Redlock アルゴリズムの基本的な考え方は、クライアントと複数の独立した Redis インスタンスが順番にロックを要求できるようにすることです。クライアントが半分以上のインスタンスでロック操作を正常に完了できた場合、クライアントは成功したと見なされます。分散ロックが取得されると、ロックは失敗します

。このように、単一の Redis インスタンスに障害が発生した場合でも、ロック変数は他のインスタンスにも保存されるため、クライアントは引き続きロック操作を正常に実行でき、ロック変数が失われることはありません。

Redlock アルゴリズムの実行ステップを詳しく見てみましょう。 Redlock アルゴリズムを実装するには、Redis がセンチネル ノードを使用しないクラスター デプロイメント モードを採用し、N 個の独立した Redis インスタンス (公式には少なくとも 5 つのインスタンスが推奨されます) を必要とします。次に、ロック操作は 3 ステップで完了します。

最初のステップは、クライアントが現在時刻を取得することです。

2 番目のステップは、クライアントが N 個の Redis インスタンスに対してロック操作を順番に実行することです。

ここでのロック操作は、単一インスタンスで実行されるロック操作と同じです。NX、EX/PX オプション、およびクライアントの一意の識別子を指定して SET コマンドを使用します。もちろん、Redis インスタンスに障害が発生した場合でも Redlock アルゴリズムが確実に実行できるようにするには、ロック操作のタイムアウトを設定する必要があります。クライアントがタイムアウトになるまで Redis インスタンスでのロックの要求に失敗した場合、この時点でクライアントは次の Redis インスタンスでのロックを要求し続けます。一般に、ロック操作のタイムアウトは、ロックの有効時間のごく一部 (通常は約数十ミリ秒) に設定する必要があります。

Redis を使用して SpringBoot に分散ロックを実装する方法3 番目のステップでは、クライアントがすべての Redis インスタンスでロック操作を完了したら、クライアントはロック プロセス全体に費やした合計時間を計算する必要があります。

クライアントは、2 つの条件が満たされた場合にのみ、ロックが成功したとみなすことができます。条件 1 は、クライアントが半分以上 (N/2 1 以上) からロックを正常に取得したことです。 ) ; 2 番目の条件は、クライアントがロックを取得するのに費やした合計時間が、ロックの有効時間を超えないことです。

ほとんどのインスタンスが正常にロックされた場合にのみ、操作が成功したとみなされるのはなぜですか?実際、複数の Redis インスタンスを一緒に使用して分散システムを形成します。分散システムには異常なノードが常に存在するため、分散システムについて語るときは、システム全体の正常な動作に影響を与えずに、異常なノードがどれだけ存在するかを考慮する必要があります。これは分散システムにおけるフォールト トレランスの問題であり、この問題の結論は、障害のあるノードだけが存在する場合でも、ほとんどのノードが正常である限り、システム全体は依然として正しいサービスを提供できるということです。

これら 2 つの条件を満たした後、

ロックの有効時間を再計算する必要があります。計算の結果は、ロックの初期有効時間からクライアントがロックを取得するのに費やした合計時間を引いたものになります。ロック。ロックの有効期限が遅すぎて共有データ操作を完了できない場合は、共有リソース操作が完了する前にロックが期限切れになる状況を避けるためにロックを解放できます。

当然,如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端就要向所有Redis节点发起释放锁的操作。为什么释放锁,要操作所有的节点呢,不能只操作那些加锁成功的节点吗?因为在某一个Redis节点加锁时,可能因为网络原因导致加锁失败,例如一个客户端在一个Redis实例上加锁成功,但在读取响应结果时由于网络问题导致读取失败,那这把锁其实已经在Redis上加锁成功了。所以释放锁时,不管之前有没有加锁成功,需要释放所有节点上的锁以保证清理节点上的残留的锁

在Redlock算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua脚本就可以了。如果N个Redis实例中超过一半的实例正常工作,就能确保分布式锁正常运作。为了提高分布式锁的可靠性,您可以在实际业务应用中使用Redlock算法。

二、代码实现Redis分布式锁

1.SpringBoot整合redis用到最多的当然属于我们的老朋友RedisTemplate,pom依赖如下:

<!-- springboot整合redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.Redis配置类:

package com.example.redisdemo.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @description: Redis配置类
 * @author Keson
 * @date 21:20 2022/11/14
 * @Param
 * @return
 * @version 1.0
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 设置序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

3.Service层面

package com.example.redisdemo.service;

import com.example.redisdemo.entity.CustomerBalance;
import java.util.concurrent.Callable;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO
 * @date 2022/11/14 15:12
 */
public interface RedisService {

    <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception;
}
package com.example.redisdemo.service.impl;

import com.example.redisdemo.entity.CustomerBalance;
import com.example.redisdemo.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO Redis实现分布式锁
 * @date 2022/11/14 15:13
 */
@Service
@Slf4j
public class RedisServiceImpl implements RedisService {

    //设置默认过期时间
    private final static int DEFAULT_LOCK_EXPIRY_TIME = 20;
    //自定义lock key前缀
    private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE";

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{
        //自定义lock key
        String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode());
        //将UUID当做value,确保唯一性
        String lockReference = UUID.randomUUID().toString();

        try {
            if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) {
                throw new Exception("lock加锁失败");
            }
            return callable.call();
        } finally {
            unlock(lockKey, lockReference);
        }
    }

    //定义lock key
    String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) {
        return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode);
    }

    //redis加锁
    private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) {
        Boolean locked;
        try {
            //SET_IF_ABSENT --> NX: Only set the key if it does not already exist.
            //SET_IF_PRESENT --> XX: Only set the key if it already exist.
            locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8),
                            Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
        } catch (Exception e) {
            log.error("Lock failed for redis key: {}, value: {}", key, value);
            locked = false;
        }
        return locked != null && locked;
    }

    //redis解锁
    private boolean unlock(String key, String value) {
        try {
            //使用lua脚本保证删除的原子性,确保解锁
            String script = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] " +
                            "then return redis.call(&#39;del&#39;, KEYS[1]) " +
                            "else return 0 end";
            Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1,
                            key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)));
            return unlockState == null || !unlockState;
        } catch (Exception e) {
            log.error("unLock failed for redis key: {}, value: {}", key, value);
            return false;
        }
    }
}

4.业务调用实现分布式锁示例:

    @Override
    public int updateById(CustomerBalance customerBalance) throws Exception {
        return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance));
    }

以上がRedis を使用して SpringBoot に分散ロックを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。