>  기사  >  데이터 베이스  >  Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

WBOY
WBOY앞으로
2023-06-03 08:16:321505검색

1. Redis는 분산 잠금의 원칙을 구현합니다

분산 잠금이 필요한 이유

분산 잠금에 대해 이야기하기 전에 분산 잠금이 필요한 이유를 설명할 필요가 있습니다. 分布式锁

与分布式锁相对就的是单机锁,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来互斥以保证共享变量的正确性,其使用范围是在同一个进程中。如果换做是多个进程,需要同时操作一个共享资源,如何互斥呢?现在的业务应用通常是微服务架构,这也意味着一个应用会部署多个进程,多个进程如果需要修改MySQL中的同一行记录,为了避免操作乱序导致脏数据,此时就需要引入分布式锁了。

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

想要实现分布式锁,必须借助一个外部系统,所有进程都去这个系统上申请加锁。这个外部系统必须具有互斥能力,也就是说,如果两个请求同时到达,系统只会成功地为一个进程加锁,而另一个进程会失败。这个外部系统可以是数据库,也可以是Redis或Zookeeper,但为了追求性能,我们通常会选择使用Redis或Zookeeper来做。

Redis可以作为一个共享存储系统,多个客户端可以共享访问,因此可以被用来保存分布式锁。而且 Redis 的读写性能高,可以应对高并发的锁操作场景。这篇文章的重点在于介绍如何使用Redis实现分布式锁,并探讨在实现过程中可能会遇到的问题。

分布式锁如何实现

作为分布式锁实现过程中的共享存储系统,Redis可以使用键值对来保存锁变量,在接收和处理不同客户端发送的加锁和释放锁的操作请求。那么,键值对的键和值具体是怎么定的呢?我们要赋予锁变量一个变量名,把这个变量名作为键值对的键,而锁变量的值,则是键值对的值,这样一来,Redis就能保存锁变量了,客户端也就可以通过Redis的命令操作来实现锁操作。

想要实现分布式锁,必须要求Redis有互斥的能力。可以使用SETNX命令,其含义是SET IF NOT EXIST,即如果key不存在,才会设置它的值,否则什么也不做。实现一种分布式锁的方法是,两个客户端进程互斥地执行该命令。

以下展示了Redis使用key/value对保存锁变量,以及两个客户端同时请求加锁的操作过程。

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

加锁操作完成后,加锁成功的客户端,就可以去操作共享资源,例如,修改MySQL的某一行数据。操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?直接使用DEL命令删除这个key即可。这个逻辑非常简单,整体的流程写成伪代码就是下面这样。

// 加锁
SETNX lock_key 1
// 业务逻辑
DO THINGS
// 释放锁
DEL lock_key

但是,以上实现存在一个很大的问题,当客户端1拿到锁后,如果发生下面的场景,就会造成死锁。

程序处理业务逻辑异常,没及时释放锁进程挂了,没机会释放锁

以上情况会导致已经获得锁的客户端一直占用锁,其他客户端永远无法获取到锁。

如何避免死锁

为了解决以上死锁问题,最容易想到的方案是在申请锁时,在Redis中实现时,给锁设置一个过期时间,假设操作共享资源的时间不会超过10s,那么加锁时,给这个key设置10s过期即可。

但以上操作还是有问题,加锁、设置过期时间是2条命令,有可能只执行了第一条,第二条却执行失败,例如:

1.SETNX执行成功,执行EXPIRE时由于网络问题,执行失败
2.SETNX执行成功,Redis异常宕机,EXPIRE没有机会执行
3.SETNX执行成功,客户端异常崩溃,EXPIRE没有机会执行

总之这两条命令如果不能保证是原子操作,就有潜在的风险导致过期时间设置失败,依旧有可能发生死锁问题

분산 잠금의 반대는 독립형 잠금입니다. 멀티 스레드 프로그램을 작성할 때, 공유 변수를 동시에 작동하여 발생하는 데이터 문제를 방지하기 위해 일반적으로 잠금을 사용하여 정확성을 보장합니다. 공유 변수의 용도 범위는 동일한 프로세스 내에 있습니다. 공유 리소스를 동시에 운영해야 하는 여러 프로세스가 있는 경우 어떻게 상호 배타적일 수 있습니까? 오늘날의 비즈니스 애플리케이션은 일반적으로 마이크로서비스 아키텍처입니다. 이는 하나의 애플리케이션이 여러 프로세스를 배포한다는 의미이기도 합니다. 여러 프로세스가 MySQL에서 동일한 레코드 행을 수정해야 하는 경우 잘못된 작업으로 인해 발생하는 더티 데이터를 방지하려면 배포가 필요합니다. 현재 소개할 스타일은 잠겨 있습니다.

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

Think 분산 잠금을 구현하려면 외부 시스템을 사용해야 하며 모든 프로세스가 이 시스템으로 이동하여 잠금을 적용합니다. 이 외부 시스템은 상호 배타적이어야 합니다. 즉, 두 요청이 동시에 도착하면 시스템은 한 프로세스만 성공적으로 잠그고 다른 프로세스는 실패합니다. 이러한 외부 시스템은 데이터베이스, Redis 또는 Zookeeper가 될 수 있지만, 성능을 추구하기 위해 일반적으로 Redis 또는 Zookeeper를 사용합니다.

Redis는 공유 스토리지 시스템으로 사용할 수 있으며, 여러 클라이언트가 액세스를 공유할 수 있으므로 분산 잠금을 저장하는 데 사용할 수 있습니다. 또한 Redis는 읽기 및 쓰기 성능이 뛰어나며 동시성이 높은 잠금 작업 시나리오를 처리할 수 있습니다. 이 기사의 초점은 Redis를 사용하여 분산 잠금을 구현하는 방법을 소개하고 구현 프로세스 중에 발생할 수 있는 문제를 논의하는 것입니다.

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

분산 잠금 구현 방법
  1. 분산 잠금 구현의 공유 스토리지 시스템인 Redis는 키-값 쌍을 사용하여 잠금 변수를 저장하고 다른 클라이언트에서 보낸 잠금 및 해제 작업을 수신하고 처리할 수 있습니다. 그렇다면 키-값 쌍의 키와 값은 어떻게 결정되나요? 잠금 변수에 변수 이름을 지정하고 이 변수 ​​이름을 키-값 쌍의 키로 사용해야 하며, 잠금 변수의 값은 키-값 쌍의 값이 됩니다. 이런 식으로 Redis는 lock 변수 및 클라이언트는 Redis 명령 작업을 통해 잠금 작업을 구현할 수 있습니다.

    🎜분산 잠금을 구현하려면 Redis에 상호 배제 기능이 있어야 합니다. SET IF NOT EXIST를 의미하는 SETNX 명령을 사용할 수 있습니다. 즉, 키가 없으면 해당 값이 설정되고 그렇지 않으면 아무 작업도 수행되지 않습니다. 분산 잠금은 두 클라이언트 프로세스가 상호 배타적인 명령을 실행하도록 하여 구현됩니다. 🎜🎜다음은 Redis가 키/값 쌍을 사용하여 잠금 변수를 저장하는 방법과 동시에 잠금을 요청하는 두 클라이언트의 작업 프로세스를 보여줍니다. 🎜🎜Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법🎜🎜Add 잠금 작업이 완료된 후 성공적으로 잠긴 클라이언트는 공유 리소스를 작업할 수 있습니다. 예를 들어 MySQL의 특정 데이터 행을 수정할 수 있습니다. 작업이 완료된 후, 후발자에게 공유 리소스를 운영할 수 있는 기회를 제공하기 위해 잠금을 제때 해제해야 합니다. 잠금을 해제하는 방법은 무엇입니까? 이 키를 삭제하려면 DEL 명령을 사용하세요. 로직은 매우 간단합니다. 의사 코드로 작성된 전체 프로세스는 다음과 같습니다. 🎜
    //释放锁 比较unique_value是否相等,避免误释放
    if redis.get("key") == unique_value then
        return redis.del("key")
    🎜그러나 위 구현에는 큰 문제가 있습니다. 클라이언트 1이 잠금을 얻었을 때 다음 시나리오가 발생하면 교착 상태가 발생합니다. 🎜🎜프로그램이 비즈니스 로직 예외를 처리하고 제때에 잠금을 해제하지 못합니다. 프로세스가 중단되고 잠금을 해제할 기회가 없습니다. 위의 상황으로 인해 잠금을 획득한 클라이언트가 항상 잠금을 차지하게 됩니다. 다른 클라이언트는 잠금을 얻을 수 없습니다. 🎜🎜🎜교착상태를 피하는 방법🎜🎜🎜위의 교착상태 문제를 해결하기 위해 생각하기 가장 쉬운 해결책은 잠금을 적용하고 Redis에서 구현할 때 잠금의 만료 시간을 설정하는 것이라고 가정합니다. 공유 리소스 작동 시간은 10초를 초과하지 않습니다. 그런 다음 잠글 때 이 키의 만료 시간을 10초로 설정하면 됩니다. 🎜🎜그러나 위 작업에는 여전히 문제가 있습니다. 잠금 및 만료 시간 설정은 두 가지 명령입니다. 첫 번째 명령만 실행되고 두 번째 명령은 실패할 수 있습니다. 예: 🎜🎜 🎜1. SETNX가 성공적으로 실행되었으나 네트워크 문제로 인해 EXPIRE가 실패함
    2. SETNX가 성공적으로 실행되었으나 Redis가 비정상적으로 충돌하여 EXPIRE가 실행될 기회가 없었음
    3. 클라이언트가 비정상적으로 충돌하여 EXPIRE가 실행될 기회가 없었습니다. 🎜🎜🎜 간단히 말해서, 이 두 명령이 원자적 작업임을 보장할 수 없으면 만료 시간 설정이 실패하고 교착 상태 문제가 계속 발생할 위험이 있습니다. 다행히 Redis는 SET 명령의 매개변수를 확장하여 SET과 동시에 EXPIRE 시간을 지정할 수 있습니다. 예를 들어 다음 명령은 잠금 만료 시간을 10초로 설정합니다. . 🎜🎜🎜SET lock_key 1 EX 10 NX🎜🎜🎜지금까지 교착상태 문제는 해결되었지만 여전히 다른 문제가 남아있습니다. 다음 시나리오를 상상해 보세요. 🎜🎜🎜🎜🎜🎜🎜클라이언트 1이 성공적으로 잠기고 공유 리소스 운영을 시작합니다🎜
  2. 客户端1操作共享资源耗时太久,超过了锁的过期时间,锁失效(锁被自动释放)

  3. 客户端2加锁成功,开始操作共享资源

  4. 客户端1操作共享资源完成,在finally块中手动释放锁,但此时它释放的是客户端2的锁。

这里存在两个严重的问题:

  • 锁过期

  • 释放了别人的锁

第1个问题是评估操作共享资源的时间不准确导致的,如果只是一味增大过期时间,只能缓解问题降低出现问题的概率,依旧无法彻底解决问题。原因在于客户端在拿到锁之后,在操作共享资源时,遇到的场景是很复杂的,既然是预估的时间,也只能是大致的计算,不可能覆盖所有导致耗时变长的场景

第二个问题在于解锁操作是不够严谨的,因为它是一种不加区分地释放锁的操作,没有对锁的所有权进行检查。如何解决呢?

锁被别人给释放了

解决办法是,客户端在加锁时,设置一个只有自己知道的唯一标识进去,例如可以是自己的线程ID,如果是redis实现,就是SET key unique_value EX 10 NX。之后在释放锁时,要先判断这把锁是否归自己持有,只有是自己的才能释放它。

//释放锁 比较unique_value是否相等,避免误释放
if redis.get("key") == unique_value then
    return redis.del("key")

这里释放锁使用的是GET + DEL两条命令,这时又会遇到原子性问题了。

  1. 客户端1执行GET,判断锁是自己的

  2. 客户端2执行了SET命令,强制获取到锁(虽然发生概念很低,但要严谨考虑锁的安全性)

  3. 客户端1执行DEL,却释放了客户端2的锁

由此可见,以上GET + DEL两个命令还是必须原子的执行才行。怎样原子执行两条命令呢?答案是Lua脚本,可以把以上逻辑写成Lua脚本,让Redis执行。因为Redis处理每个请求是单线程执行的,在执行一个Lua脚本时其它请求必须等待,直到这个Lua脚本处理完成,这样一来GET+DEL之间就不会有其他命令执行了。

以下是使用Lua脚本(unlock.script)实现的释放锁操作的伪代码,其中,KEYS[1]表示lock_key,ARGV[1]是当前客户端的唯一标识,这两个值都是我们在执行 Lua脚本时作为参数传入的。

//Lua脚本语言,释放锁 比较unique_value是否相等,避免误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

最后我们执行以下命令,即可

redis-cli  --eval  unlock.script lock_key , unique_value

这样一路优先下来,整个加锁、解锁流程就更严谨了,先小结一下,基于Redis实现的分布式锁,一个严谨的流程如下:

  1. 加锁时要设置过期时间SET lock_key unique_value EX expire_time NX

  2. 操作共享资源

  3. 释放锁:Lua脚本,先GET判断锁是否归属自己,再DEL释放锁

有了这个严谨的锁模型,我们还需要重新思考之前的那个问题,锁的过期时间不好评估怎么办。

如何确定锁的过期时间

前面提到过,过期时间如果评估得不好,这个锁就会有提前过期的风险,一种妥协的解决方案是,尽量冗余过期时间,降低锁提前过期的概率,但这个方案并不能完美解决问题。是否可以设置这样的方案,加锁时,先设置一个预估的过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间

Redisson是一个已封装好这些工作的库,可以说是一种非常优秀的解决方案。Redisson是一个Java语言实现的Redis SDK客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般叫它看门狗线程。这个SDK提供的API非常友好,它可以像操作本地锁一样操作分布式锁。客户端一旦加锁成功,就会启动一个watch dog看门狗线程,它是一个后台线程,会每隔一段时间(这段时间的长度与设置的锁的过期时间有关)检查一下,如果检查时客户端还持有锁key(也就是说还在操作共享资源),那么就会延长锁key的生存时间。

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

那如果客户端在加锁成功后就宕机了呢?宕机了那么看门狗任务就不存在了,也就无法为锁续期了,锁到期自动失效。

Redis的部署方式对锁的影响

上面讨论的情况,都是锁在单个Redis 实例中可能产生的问题,并没有涉及到Redis的部署架构细节。

Redis发展到现在,几种常见的部署架构有:

  • 단일 머신 모드,

  • 마스터-슬레이브 모드,

  • 클러스터 모드,

  • 일반적으로 마스터-슬레이브 모드를 사용합니다. e 클러스터+ 센티넬 모드 배포에서 센티널의 역할은 Redis 노드의 실행 상태를 모니터링하는 것입니다. 일반적인 마스터-슬레이브 모드에서는 마스터가 충돌할 때 슬레이브를 마스터로 만들기 위해 수동으로 전환해야 합니다. 마스터-슬레이브 + 센트리 조합을 사용하면 마스터가 비정상적으로 충돌할 때 센티널이 자동 장애 조치를 구현할 수 있다는 것입니다. 슬레이브를 새로운 마스터로 승격시키고 가용성을 보장하기 위해 서비스를 계속 제공합니다. 그렇다면 마스터-슬레이브 전환이 발생해도 분산 잠금은 여전히 ​​안전할까요?

  • Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

Imagine 이러한 시나리오: 一般会采用主从集群+哨兵的模式部署,哨兵的作用就是监测redis节点的运行状态。普通的主从模式,当master崩溃时,需要手动切换让slave成为master,使用主从+哨兵结合的好处在于,当master异常宕机时,哨兵可以实现故障自动切换,把slave提升为新的master,继续提供服务,以此保证可用性。那么当主从发生切换时,分布式锁依旧安全吗?

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

想像这样的场景:

  1. 客户端1在master上执行SET命令,加锁成功

  2. 此时,master异常宕机,SET命令还未同步到slave上(主从复制是异步的)

  3. 哨兵将slave提升为新的master,但这个锁在新的master上丢失了,导致客户端2来加锁成功了,两个客户端共同操作共享资源

可见,当引入Redis副本后,分布式锁还是可能受到影响。即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况。

集群模式+Redlock实现高可靠的分布式锁

为了避免Redis实例故障而导致的锁无法工作的问题,Redis的开发者 Antirez提出了分布式锁算法Redlock。Redlock算法的基本思路,是让客户端和多个独立的Redis实例依次请求加锁,如果客户端能够和半数以上的实例成功地完成加锁操作,那么我们就认为,客户端成功地获得分布式锁了,否则加锁失败。这样一来,即使有单个Redis实例发生故障,因为锁变量在其它实例上也有保存,所以,客户端仍然可以正常地进行锁操作,锁变量并不会丢失。

来具体看下Redlock算法的执行步骤。Redlock算法的实现要求Redis采用集群部署模式,无哨兵节点,需要有N个独立的Redis实例(官方推荐至少5个实例)。接下来,我们可以分成3步来完成加锁操作。

Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법

第一步是,客户端获取当前时间。

第二步是,客户端按顺序依次向N个Redis实例执行加锁操作。

这里的加锁操作和在单实例上执行的加锁操作一样,使用SET命令,带上NX、EX/PX选项,以及带上客户端的唯一标识。当然,如果某个Redis实例发生故障了,为了保证在这种情况下,Redlock算法能够继续运行,我们需要给加锁操作设置一个超时时间。如果客户端在和一个Redis实例请求加锁时,一直到超时都没有成功,那么此时,客户端会和下一个Redis实例继续请求加锁。一般需要将加锁操作的超时时间设置为锁的有效时间的一小部分,通常约为几十毫秒。

第三步是,一旦客户端完成了和所有Redis实例的加锁操作,客户端就要计算整个加锁过程的总耗时。

客户端只有在满足两个条件时,才能认为是加锁成功,条件一是客户端从超过半数(大于等于 N/2+1)的Redis实例上成功获取到了锁;条件二是客户端获取锁的总耗时没有超过锁的有效时间。

为何只有在大多数实例加锁成功时才能算操作成功?事实上,多个Redis实例一起使用组成了一个分布式系统。在分布式系统中总会出现异常节点,所以在谈论分布式系统时,需要考虑异常节点达到多少个,也依旧不影响整个系统的正确运行。这是一个分布式系统的容错问题,这个问题的结论是:如果只存在故障节点,只要大多数节点正常,那么整个系统依旧可以提供正确服务。

在满足了这两个条件后,我们需要重新计算这把锁的有效时间,计算的结果是锁的最初有效时间减去客户端为获取锁的总耗时。如果锁的有效时间已经来不及完成共享数据的操作了,我们可以释放锁,以免出现还没完成共享资源操作,锁就过期了的情况

    🎜클라이언트 1이 마스터에서 SET 명령을 실행하고 잠금에 성공합니다🎜🎜🎜🎜이때 마스터가 비정상적으로 다운되었으며 SET 명령이 아직 동기화되지 않았습니다. 슬레이브에서(마스터-슬레이브 복제는 비동기식입니다) 🎜🎜🎜🎜 Sentinel이 슬레이브를 새 마스터로 승격했지만 새 마스터에서 잠금이 손실되어 클라이언트 2가 성공적으로 잠금을 추가했습니다. , 두 클라이언트가 함께 작동했습니다. 공유 리소스🎜🎜
🎜Redis 복제본이 도입되면 분산 잠금이 여전히 영향을 받을 수 있음을 알 수 있습니다. Redis가 Sentinel을 통해 고가용성을 보장하더라도 어떤 이유로 마스터 노드가 마스터-슬레이브로 전환되면 잠금이 해제됩니다. 🎜🎜클러스터 모드 + Redlock은 신뢰성이 높은 분산 잠금을 구현합니다🎜🎜Redis 인스턴스 실패로 인한 잠금 실패 문제를 방지하기 위해 Redis 개발자 Antirez는 분산 잠금 알고리즘 Redlock을 제안했습니다. Redlock 알고리즘의 기본 아이디어는 클라이언트와 여러 개의 독립적인 Redis 인스턴스가 순차적으로 잠금을 요청하도록 하는 것입니다. 클라이언트가 인스턴스의 절반 이상으로 잠금 작업을 성공적으로 완료할 수 있으면 클라이언트가 성공적으로 잠금 작업을 완료한 것으로 간주합니다. 분산 잠금을 획득하지 않으면 잠금이 실패합니다. 이렇게 하면 하나의 Redis 인스턴스가 실패하더라도 잠금 변수가 다른 인스턴스에도 저장되므로 클라이언트는 정상적으로 잠금 작업을 수행할 수 있으며 잠금 변수가 손실되지 않습니다. 🎜🎜Redlock 알고리즘의 실행 단계를 자세히 살펴보겠습니다. Redlock 알고리즘을 구현하려면 Redis가 센티넬 노드 없이 클러스터 배포 모드를 채택하고 N개의 독립 Redis 인스턴스(공식적으로 최소 5개 인스턴스 권장)를 채택해야 합니다. 다음으로 3단계로 잠금 작업을 완료할 수 있습니다. 🎜🎜Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법🎜🎜아니요 첫 번째 단계는 클라이언트가 현재 시간을 얻는 것입니다. 🎜🎜두 번째 단계는 클라이언트가 N Redis 인스턴스에 대해 잠금 작업을 순차적으로 수행하는 것입니다. 🎜🎜여기서 잠금 작업은 단일 인스턴스에서 수행되는 잠금 작업과 동일하며 NX, EX/PX 옵션 및 클라이언트의 고유 식별자와 함께 SET 명령을 사용합니다. 물론 Redis 인스턴스가 실패하는 경우 Redlock 알고리즘이 계속 실행될 수 있도록 하려면 잠금 작업에 대한 시간 초과를 설정해야 합니다. 클라이언트가 제한 시간까지 Redis 인스턴스에 대한 잠금 요청에 실패하면 이때 클라이언트는 다음 Redis 인스턴스에 계속해서 잠금을 요청합니다. 일반적으로 잠금 작업의 시간 초과를 잠금 유효 시간의 작은 부분(보통 수십 밀리초 정도)으로 설정해야 합니다. 🎜🎜세 번째 단계는 클라이언트가 모든 Redis 인스턴스에 대한 잠금 작업을 완료하면 클라이언트는 전체 잠금 프로세스에 소요된 총 시간을 계산해야 한다는 것입니다. 🎜🎜클라이언트는 두 가지 조건이 충족되는 경우에만 잠금이 성공한 것으로 간주할 수 있습니다. 첫 번째 조건은 클라이언트가 절반 이상(N/2+1 이상)에서 잠금을 성공적으로 획득했다는 것입니다. Redis 인스턴스 두 번째 조건은 클라이언트가 잠금을 획득하는 데 걸리는 총 시간이 잠금의 유효 시간을 초과하지 않음을 의미합니다. 🎜🎜대부분의 인스턴스가 성공적으로 잠긴 경우에만 작업이 성공한 것으로 간주될 수 있는 이유는 무엇입니까? 실제로 여러 Redis 인스턴스가 함께 사용되어 분산 시스템을 구성합니다. 분산 시스템에는 항상 비정상 노드가 있기 때문에 분산 시스템에 대해 이야기할 때 전체 시스템의 올바른 작동에 영향을 주지 않고 비정상 노드가 몇 개 있는지 고려해야 합니다. 이는 분산 시스템의 내결함성 문제입니다. 이 문제의 결론은 다음과 같습니다. 결함이 있는 노드만 있으면 대부분의 노드가 정상인 한 전체 시스템은 여전히 ​​올바른 서비스를 제공할 수 있습니다. 🎜🎜이 두 가지 조건을 충족한 후 잠금 유효 시간을 다시 계산해야 합니다. 계산 결과는 잠금의 초기 유효 시간에서 클라이언트가 잠금을 획득하는 데 소요한 총 시간을 뺀 값입니다. 잠금의 유효 시간이 너무 늦어서 공유 데이터 작업을 완료할 수 없는 경우 공유 리소스 작업이 완료되기 전에 잠금이 만료되는 것을 방지하기 위해 잠금을 해제할 수 있습니다. 🎜

当然,如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端就要向所有Redis节点发起释放锁的操作。为什么释放锁,要操作所有的节点呢,不能只操作那些加锁成功的节点吗?因为在某一个Redis节点加锁时,可能因为网络原因导致加锁失败,例如一个客户端在一个Redis实例上加锁成功,但在读取响应结果时由于网络问题导致读取失败,那这把锁其实已经在Redis上加锁成功了。所以释放锁时,不管之前有没有加锁成功,需要释放所有节点上的锁以保证清理节点上的残留的锁

在Redlock算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua脚本就可以了。如果N个Redis实例中超过一半的实例正常工作,就能确保分布式锁正常运作。为了提高分布式锁的可靠性,您可以在实际业务应用中使用Redlock算法。

二、代码实现Redis分布式锁

1.SpringBoot整合redis用到最多的当然属于我们的老朋友RedisTemplate,pom依赖如下:

<!-- springboot整合redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.Redis配置类:

package com.example.redisdemo.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @description: Redis配置类
 * @author Keson
 * @date 21:20 2022/11/14
 * @Param
 * @return
 * @version 1.0
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 设置序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

3.Service层面

package com.example.redisdemo.service;

import com.example.redisdemo.entity.CustomerBalance;
import java.util.concurrent.Callable;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO
 * @date 2022/11/14 15:12
 */
public interface RedisService {

    <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception;
}
package com.example.redisdemo.service.impl;

import com.example.redisdemo.entity.CustomerBalance;
import com.example.redisdemo.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO Redis实现分布式锁
 * @date 2022/11/14 15:13
 */
@Service
@Slf4j
public class RedisServiceImpl implements RedisService {

    //设置默认过期时间
    private final static int DEFAULT_LOCK_EXPIRY_TIME = 20;
    //自定义lock key前缀
    private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE";

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{
        //自定义lock key
        String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode());
        //将UUID当做value,确保唯一性
        String lockReference = UUID.randomUUID().toString();

        try {
            if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) {
                throw new Exception("lock加锁失败");
            }
            return callable.call();
        } finally {
            unlock(lockKey, lockReference);
        }
    }

    //定义lock key
    String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) {
        return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode);
    }

    //redis加锁
    private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) {
        Boolean locked;
        try {
            //SET_IF_ABSENT --> NX: Only set the key if it does not already exist.
            //SET_IF_PRESENT --> XX: Only set the key if it already exist.
            locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8),
                            Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
        } catch (Exception e) {
            log.error("Lock failed for redis key: {}, value: {}", key, value);
            locked = false;
        }
        return locked != null && locked;
    }

    //redis解锁
    private boolean unlock(String key, String value) {
        try {
            //使用lua脚本保证删除的原子性,确保解锁
            String script = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] " +
                            "then return redis.call(&#39;del&#39;, KEYS[1]) " +
                            "else return 0 end";
            Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1,
                            key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)));
            return unlockState == null || !unlockState;
        } catch (Exception e) {
            log.error("unLock failed for redis key: {}, value: {}", key, value);
            return false;
        }
    }
}

4.业务调用实现分布式锁示例:

    @Override
    public int updateById(CustomerBalance customerBalance) throws Exception {
        return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance));
    }

위 내용은 Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제