ロックされていますが、同時実行の問題はありますか? Redis分散ロックを本当に理解していますか?次の記事では、分散システムにおける Redis ベースの分散ロックについて説明します。
#新しく引き継いだプロジェクトでは、アカウントが不均等であるという問題が発生することがあります。退職前の前任技術上司の説明は、「調査した結果、原因が見つからなかった。その後、忙しくて解決できなかった。フレームワークのせいかもしれない…」でした。 #プロジェクトが納品された今、このような問題を解決する必要があります。すべての会計処理ロジックを整理した結果、最終的にその原因がわかりました。ホット アカウントでのデータベースの同時操作が原因でした。この問題に関して、分散システムにおける Redis に基づく分散ロックについて話しましょう。ちなみに、問題の原因と解決策も詳しく説明します。 [関連する推奨事項:
Redis ビデオ チュートリアル 原因分析この時点で、スケジュールされた別のタスクもアカウントをスキャンして更新します。さらに、同じアカウントに対する操作がさまざまなシステムに分散され、ホットアカウントが出現します。
この問題をアーキテクチャ レベルで解決するには、会計システムを切り離し、1 つのシステムに集中して処理することを検討できます。すべてのデータベース トランザクションと実行シーケンスは会計システムによって調整され、処理されます。技術的な観点から見ると、ホットスポット アカウントはロック メカニズムを通じてロックできます。
この記事では、ホット アカウントの分散ロックの実装について詳しく説明します。
ロックの分析分散ロック を使用して問題を解決します。 データベース ロックは、楽観的ロック
と悲観的ロックに分割されます。 悲観的ロック
は、データベース (Mysql の InnoDB) によって提供される排他的ロックに基づいて実装されます。 select ... for update ステートメントを通じてトランザクション操作を実行するとき、MySQL はクエリ結果セット内のデータの各行に排他的ロックを追加し、他のスレッドはレコードの更新および削除操作をブロックします。共有リソースの順次実行 (変更) を実現するために、オプティミスティック ロック
は悲観的ロックに対して相対的なものです。オプティミスティック ロックは、通常、データが競合を引き起こさないことを前提としているため、データは送信および更新されて初めて、データの競合が正式に検出されます。競合がある場合、例外情報がユーザーに返され、ユーザーが何をすべきかを決定できるようになります。オプティミスティック ロックは、読み取りが多く書き込みが少ないシナリオに適しており、プログラムのスループットを向上させることができます。楽観的ロックは通常、記録ステータスまたはバージョンの追加に基づいて実装されます。悲観的ロックの失敗シナリオ
更新のために選択...を通じてレコードをロックします;
口座残高を照会し、新しい残高を計算する;
本題に入り、Redis に基づく分散ロックの実装について話しましょう。
Redis 分散ロックの実践演習クラスタ環境で同一アカウントの金額を操作する基本手順:
以下はロックせず、最初から同期処理します。徐々に最終的な分布ロックを推測します。
ロックのない基本的なビジネス環境を準備します。
まず、関連する依存関係を Spring Boot プロジェクトに導入します:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
アカウントに対応するエンティティ クラス UserAccount:
public class UserAccount { //用户ID private String userId; //账户内金额 private int amount; //添加账户金额 public void addAmount(int amount) { this.amount = this.amount + amount; } // 省略构造方法和getter/setter }
スレッド実装クラス AccountOperationThread を作成します:
public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); private static final Long RELEASE_SUCCESS = 1L; private String userId; private RedisTemplate<Object, Object> redisTemplate; public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) { this.userId = userId; this.redisTemplate = redisTemplate; } @Override public void run() { noLock(); } /** * 不加锁 */ private void noLock() { try { Random random = new Random(); // 模拟线程进行业务处理 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); // 金额+1 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } }
RedisTemplate のインスタンス化が Spring Boot に渡されます:
@Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
最後に、マルチスレッド操作をトリガーする TestController を準備します:
@RestController public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); private static ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired private RedisTemplate<Object, Object> redisTemplate; @GetMapping("/test") public String test() throws InterruptedException { // 初始化用户user_001到Redis,账户金额为0 redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); // 开启10个线程进行同步测试,每个线程为账户增加1元 for (int i = 0; i < 10; i++) { logger.info("创建线程i=" + i); executorService.execute(new AccountOperationThread("user_001", redisTemplate)); } // 主线程休眠1秒等待线程跑完 TimeUnit.MILLISECONDS.sleep(1000); // 查询Redis中的user_001账户 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); return "success"; } }
上記のプログラムを実行します (通常は 10 スレッド)。各スレッドは追加します。 1 の場合、結果は 10 になるはずです。しかし、何度か実行すると、結果は大きく異なり、基本的には 10 未満であることがわかります。
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2 [pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2 [pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5 [nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
上記のログを例にとると、最初の 4 つのスレッドはすべて値を 1 に変更しました。これは、次の 3 つのスレッドが以前の変更を上書きし、最終結果が 10 ではなく 5 だけになったことを意味します。 。これには明らかに問題があります。
上記の状況を考慮すると、同じ JVM 内でスレッド ロックを通じて完了できます。ただし、分散環境では JVM レベルのロックを実装できませんが、ここでは Redis 同期ロックを使用できます。
基本的な考え方: 最初のスレッドが入ったとき、Redis にレコードが入ります。後続のスレッドがリクエストしてきたときに、そのレコードが Redis に存在するかどうかを判断します。存在する場合は、そのレコードが存在することを意味します。ロック状態になり、待機または戻ります。存在しない場合は、後続の業務処理が行われます。
/** * 1.抢占资源时判断是否被锁。 * 2.如未锁则抢占成功且加锁,否则等待锁释放。 * 3.业务完成后释放锁,让给其它线程。 * <p> * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。 */ private void redisLock() { Random random = new Random(); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { Object lock = redisTemplate.opsForValue().get(userId + ":syn"); if (lock == null) { // 获得锁 -> 加锁 -> 跳出循环 logger.info(Thread.currentThread().getName() + ":获得锁"); redisTemplate.opsForValue().set(userId + ":syn", "lock"); break; } try { // 等待500毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
while コード ブロックでは、まず対応するユーザー ID が Redis に存在するかどうかを確認し、存在しない場合はセット ロックを実行し、存在する場合はループを抜けて待機を続けます。
上記のコードはロック機能を実装しているように見えますが、プログラムを実行すると、ロックされていない場合と同様に同時実行の問題が依然として存在することがわかります。その理由は、取得とロックの操作がアトミックではないためです。たとえば、2 つのスレッドがロックが両方とも null であることを検出してロックするとしますが、この時点でも同時実行の問題は依然として存在します。
上記の問題に対処するために、取得とロックのプロセスをアトミック化できます。これは、spring-boot-data-redis によって提供されるアトマイゼーション API に基づいて実装できます:
// 该方法使用了redis的指令:SETNX key value // 1.key不存在,设置成功返回value,setIfAbsent返回true; // 2.key存在,则设置失败返回null,setIfAbsent返回false; // 3.原子性操作; Boolean setIfAbsent(K var1, V var2);
上記のメソッドのアトマイゼーション操作は、Redis の setnx コマンドのカプセル化です。Redis での setnx の使用は次のとおりです。次のように:
redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
mykey を初めて設定するときは、mykey が存在しないため、設定が成功したことを示す 1 が返されます。2 回目に mykey が設定されると、mykey はすでに存在し、0 が返されます。が返され、設定が失敗したことが示されます。 mykey に対応する値を再度クエリすると、それがまだ最初に設定された値であることがわかります。言い換えれば、redis の setnx は、一意のキーが 1 つのサービスによってのみ正常に設定されることを保証します。
上記の API と基礎となる原則を理解した後、次のようにスレッド内の実装メソッド コードを見てみましょう:
/** * 1.原子操作加锁 * 2.竞争线程循环重试获得锁 * 3.业务完成释放锁 */ private void atomicityRedisLock() { //Spring data redis 支持的原子性操作 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { try { // 等待100毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName() + ":获得锁"); try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
コードを再度実行すると、結果が正しいことがわかります。これは、配布が正常に完了できることを意味します。スレッドはロックされています。
上記のコードの実行結果は問題ありませんが、アプリケーションが異常終了すると、メソッドを実行する時間がなくなります。最終的にロックを解放すると、他のスレッドはこのロックを取得できなくなります。
この時点で、オーバーロードされた setIfAbsent メソッドを使用できます:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
このメソッドに基づいて、ロックの有効期限を設定できます。これにより、ロックを取得したスレッドがダウンした場合でも、Redis 内のデータの有効期限が切れた後は、他のスレッドが正常にロックを取得できます。
サンプルコードは以下の通りです。
private void atomicityAndExRedisLock() { try { //Spring data redis 支持的原子性操作,并设置5秒过期时间 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁--------"); // 应用在这里宕机,进程退出,无法执行 finally; Thread.currentThread().interrupt(); // 业务逻辑... } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 if (!Thread.currentThread().isInterrupted()) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } } }
上にRedisのタイムアウト時間を追加することで解決するようです問題は解決しましたが、新たな問題が発生しました。
たとえば、通常の状況では、スレッド A は 5 秒以内にビジネスを完了できますが、場合によっては 5 秒以上かかることがあります。タイムアウトが 5 秒に設定されている場合、スレッド A がロックを取得しますが、ビジネス ロジックの処理には 6 秒かかります。この時点では、スレッド A はまだ通常のビジネス ロジックを実行しており、スレッド B はロックを取得しています。スレッド A の処理が終了すると、スレッド B のロックを解放できます。
上記のシナリオには 2 つの問題があります。
もちろん、Redis に値を設定することで、ロックがスレッド A に属するかスレッド B に属するかを判断できます。ただし、注意深く分析すると、この問題の本質は、スレッド A がビジネス ロジックの実行にロック タイムアウトよりも長い時間がかかることであることがわかります。
その場合、解決策は 2 つあります。
最初の方法では、ほとんどの場合、銀行全体の時間のかかるビジネス ロジックが必要です。タイムアウト設定。
2 番目の方法は、次のデーモン スレッド メソッドを通じてロック タイムアウトを動的に増加することです。
public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); // 是否需要守护 主线程关闭则结束守护线程 private volatile boolean daemon = true; // 守护锁 private String lockKey; private RedisTemplate<Object, Object> redisTemplate; public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) { this.lockKey = lockKey; this.redisTemplate = redisTemplate; } @Override public void run() { try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); // 剩余有效期小于1秒则续命 if (time < 1000) { logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒"); redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); } TimeUnit.MILLISECONDS.sleep(300); } logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 "); } catch (InterruptedException e) { e.printStackTrace(); } } // 主线程主动调用结束 public void stop() { daemon = false; } }
上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。
主线程中相关代码实现:
private void deamonRedisLock() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲 String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); if (value.equals(result)) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁-----"); } //关闭守护线程 if (daemonThread != null) { daemonThread.stop(); } } }
其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。
在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。
首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript
对象,在RedisConfig
配置类中添加如下实例化代码:
@Configuration public class RedisConfig { //lock script private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + " then redis.call('expire',KEYS[1],ARGV[2]) " + " return 1 " + " else return 0 end "; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + "('del', KEYS[1]) else return 0 end"; // ... 省略部分代码 @Bean public DefaultRedisScript<Boolean> lockRedisScript() { DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Boolean.class); defaultRedisScript.setScriptText(LOCK_SCRIPT); return defaultRedisScript; } @Bean public DefaultRedisScript<Long> unlockRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(UNLOCK_SCRIPT); return defaultRedisScript; } }
再通过在AccountOperationThread
类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate
来调用了,改造之后的代码实现如下:
private void deamonRedisLockWithLua() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { // 等待1000毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { logger.error("异常", e); } finally { //使用Lua脚本:先判断是否是自己设置的锁,再执行删除 // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0; Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result)); if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) { //关闭守护线程 daemonThread.stop(); logger.info(Thread.currentThread().getName() + ":释放锁---"); } } } }
其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。
除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。
当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。
可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。
有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:
在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。
当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。
通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。
同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。
Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。
源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock
更多编程相关知识,请访问:编程入门!!
以上が分散システムにおける Redis ベースの分散ロックについて話しましょうの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。