在聊天分散式鎖定之前,有必要先解釋一下,為什麼需要分散式鎖定
。
與分散式鎖相對就的是單機鎖,我們在寫多執行緒程式時,避免同時操作一個共享變數產生資料問題,通常會使用一把鎖來互斥以保證共享變數的正確性,其使用範圍在同一個進程中。如果換做是多個進程,需要同時操作一個共享資源,如何互斥?現在的業務應用通常是微服務架構,這也意味著一個應用會部署多個進程,多個進程如果需要修改MySQL中的同一行記錄,為了避免操作亂序導致髒數據,此時就需要引入分佈式鎖了。
想要實作分散式鎖,必須藉助一個外部系統,所有行程都去這個系統上申請加鎖。這個外部系統必須具有互斥能力,也就是說,如果兩個請求同時到達,系統只會成功地為一個進程加鎖,而另一個進程會失敗。這個外部系統可以是資料庫,也可以是Redis或Zookeeper,但為了追求效能,我們通常會選擇使用Redis或Zookeeper來做。
Redis可以作為一個共享儲存系統,多個客戶端可以共享訪問,因此可以被用來保存分散式鎖定。而且 Redis 的讀寫效能高,可以應付高並發的鎖定操作場景。這篇文章的重點在於介紹如何使用Redis實現分散式鎖定,並探討在實作過程中可能會遇到的問題。
作為分散式鎖定實作過程中的共用儲存系統,Redis可以使用鍵值對來保存鎖定變量,在接收和處理不同客戶端發送的加鎖和釋放鎖的操作請求。那麼,鍵值對的鍵和值具體是怎麼定的呢?我們要賦予鎖變數一個變數名,把這個變數名當作鍵值對的鍵,而鎖變數的值,則是鍵值對的值,這樣一來,Redis就能保存鎖變數了,客戶端也就可以透過Redis的命令操作來實現鎖定操作。
想要實作分散式鎖定,必須要求Redis有互斥的能力。可以使用SETNX指令,其意義是SET IF NOT EXIST,也就是如果key不存在,才會設定它的值,否則什麼都不做。實作一種分散式鎖的方法是,兩個客戶端進程互斥地執行該命令。
以下展示了Redis使用key/value對保存鎖定變量,以及兩個客戶端同時請求加鎖的操作過程。
加上鎖定作業完成後,加上鎖定成功的客戶端,就可以去操作共享資源,例如,修改MySQL的某一行資料。操作完成後,也要及時釋放鎖,給後來者讓出操作共享資源的機會。如何釋放鎖呢?直接使用DEL指令刪除這個key即可。這個邏輯非常簡單,整體的流程寫成偽程式碼就是下面這樣。
// 加锁 SETNX lock_key 1 // 业务逻辑 DO THINGS // 释放锁 DEL lock_key
但是,以上實作有一個很大的問題,當客戶端1拿到鎖後,如果發生下面的場景,就會造成死鎖。
程式處理業務邏輯異常,沒及時釋放鎖定進程掛了,沒機會釋放鎖定
以上情況會導致已經取得鎖定的用戶端一直佔用鎖,其他用戶端永遠無法取得到鎖。
為了解決以上死鎖問題,最容易想到的方案是在申請鎖定時,在Redis中實作時,給鎖定設定一個過期時間,假設操作共享資源的時間不會超過10s,那麼加鎖時,給這個key設定10s過期即可。
但以上操作還是有問題,加鎖、設定過期時間是2條指令,有可能只執行了第一條,第二條卻執行失敗
,例如:
1.SETNX執行成功,執行EXPIRE時由於網路問題,執行失敗
2.SETNX執行成功,Redis異常宕機,EXPIRE沒有機會執行
3.SETNX執行成功,客戶端異常崩潰,EXPIRE沒有機會執行
總之這兩條指令如果不能保證是原子操作,就有潛在的風險導致過期時間設定失敗,依舊有可能發生死鎖問題
。幸好在Redis 2.6.12之後,Redis擴展了SET指令的參數,可以在SET的同時指定EXPIRE時間,這條操作是原子的,例如以下指令是設定鎖的過期時間為10秒。
SET lock_key 1 EX 10 NX
#至此,解決了死鎖問題,但還是有其他問題。想像下面這個這樣一個場景:
客戶端1加鎖定成功,開始操作共享資源
客户端1操作共享资源耗时太久,超过了锁的过期时间,锁失效(锁被自动释放)
客户端2加锁成功,开始操作共享资源
客户端1操作共享资源完成,在finally块中手动释放锁,但此时它释放的是客户端2的锁。
这里存在两个严重的问题:
锁过期
释放了别人的锁
第1个问题是评估操作共享资源的时间不准确导致的,如果只是一味增大过期时间,只能缓解问题降低出现问题的概率,依旧无法彻底解决问题。原因在于客户端在拿到锁之后,在操作共享资源时,遇到的场景是很复杂的,既然是预估的时间,也只能是大致的计算,不可能覆盖所有导致耗时变长的场景
。
第二个问题在于解锁操作是不够严谨的,因为它是一种不加区分地释放锁的操作,没有对锁的所有权进行检查。如何解决呢?
解决办法是,客户端在加锁时,设置一个只有自己知道的唯一标识进去,例如可以是自己的线程ID
,如果是redis实现,就是SET key unique_value EX 10 NX。之后在释放锁时,要先判断这把锁是否归自己持有,只有是自己的才能释放它。
//释放锁 比较unique_value是否相等,避免误释放 if redis.get("key") == unique_value then return redis.del("key")
这里释放锁使用的是GET + DEL两条命令,这时又会遇到原子性
问题了。
客户端1执行GET,判断锁是自己的
客户端2执行了SET命令,强制获取到锁(虽然发生概念很低,但要严谨考虑锁的安全性)
客户端1执行DEL,却释放了客户端2的锁
由此可见,以上GET + DEL两个命令还是必须原子的执行才行。怎样原子执行两条命令呢?答案是Lua脚本,可以把以上逻辑写成Lua脚本,让Redis执行。因为Redis处理每个请求是单线程执行的,在执行一个Lua脚本时其它请求必须等待,直到这个Lua脚本处理完成
,这样一来GET+DEL之间就不会有其他命令执行了。
以下是使用Lua脚本(unlock.script)实现的释放锁操作的伪代码,其中,KEYS[1]表示lock_key,ARGV[1]是当前客户端的唯一标识,这两个值都是我们在执行 Lua脚本时作为参数传入的。
//Lua脚本语言,释放锁 比较unique_value是否相等,避免误释放 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
最后我们执行以下命令,即可
redis-cli --eval unlock.script lock_key , unique_value
这样一路优先下来,整个加锁、解锁流程就更严谨了,先小结一下,基于Redis实现的分布式锁,一个严谨的流程如下:
加锁时要设置过期时间SET lock_key unique_value EX expire_time NX
操作共享资源
释放锁:Lua脚本,先GET判断锁是否归属自己,再DEL释放锁
有了这个严谨的锁模型,我们还需要重新思考之前的那个问题,锁的过期时间不好评估怎么办。
前面提到过,过期时间如果评估得不好,这个锁就会有提前过期的风险,一种妥协的解决方案是,尽量冗余过期时间,降低锁提前过期的概率,但这个方案并不能完美解决问题。是否可以设置这样的方案,加锁时,先设置一个预估的过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间
。
Redisson是一个已封装好这些工作的库,可以说是一种非常优秀的解决方案。Redisson是一个Java语言实现的Redis SDK客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般叫它看门狗线程。这个SDK提供的API非常友好,它可以像操作本地锁一样操作分布式锁。客户端一旦加锁成功,就会启动一个watch dog看门狗线程,它是一个后台线程,会每隔一段时间(这段时间的长度与设置的锁的过期时间有关)检查一下,如果检查时客户端还持有锁key(也就是说还在操作共享资源),那么就会延长锁key的生存时间。
那如果客户端在加锁成功后就宕机了呢?宕机了那么看门狗任务就不存在了,也就无法为锁续期了,锁到期自动失效。
上面讨论的情况,都是锁在单个Redis 实例中可能产生的问题,并没有涉及到Redis的部署架构细节。
Redis发展到现在,几种常见的部署架构有:
單機模式;
主從模式;
哨兵(sentinel)模式;
叢集模式;
我們使用Redis時,一般會採用主從叢集哨兵的模式部署,哨兵的功能就是監測redis節點的運行狀態。普通的主從模式,當master崩潰時,需要手動切換讓slave成為master,使用主從哨兵結合的好處在於,當master異常宕機時,哨兵可以實現故障自動切換,把slave提升為新的master,繼續提供服務,以此保證可用性
。那麼當主從發生切換時,分散式鎖依舊安全嗎?
想像這樣的場景:
#客戶端1在master上執行SET指令,加鎖成功
#此時,master異常宕機,SET指令還未同步到slave上(主從複製是異步的)
哨兵將slave提升為新的master,但這個鎖在新的master上丟失了,導致客戶端2來加鎖成功了,兩個客戶端共同操作共享資源
可見,當引入Redis副本後,分散式鎖還是可能受到影響。即使Redis透過sentinel保證高可用,如果這個master節點因為某些原因發生了主從切換,那麼就會出現鎖丟失的情況。
叢集模式Redlock實作高可靠的分散式鎖定
為了避免Redis實例故障而導致的鎖定無法運作的問題,Redis的開發者Antirez提出了分佈式鎖演算法Redlock。 Redlock演算法的基本思路,是讓客戶端和多個獨立的Redis實例依序請求加鎖,如果客戶端能夠和半數以上的實例成功地完成加鎖操作,那麼我們就認為,客戶端成功地獲得分散式鎖定了,否則加鎖失敗
。這樣一來,即使有單一Redis實例發生故障,因為鎖定變數在其它實例上也有保存,所以,客戶端仍然可以正常地進行鎖定操作,鎖定變數並不會遺失。
來具體看下Redlock演算法的執行步驟。 Redlock演算法的實作要求Redis採用叢集部署模式,無哨兵節點,需要有N個獨立的Redis實例(官方建議至少5個實例)。接下來,我們可以分成3步驟來完成加鎖操作。
第一步是,客戶端取得目前時間。
第二步是,客戶端依序向N個Redis實例執行加鎖操作。
這裡的加鎖操作和在單一實例上執行的加鎖操作一樣,使用SET指令,帶上NX、EX/PX選項,以及帶上客戶端的唯一識別。當然,如果某個Redis實例發生故障了,為了確保在這種情況下,Redlock演算法能夠繼續運行,我們需要為加鎖操作設定一個逾時時間。如果客戶端在和一個Redis實例請求加鎖時,一直到逾時都沒有成功,那麼此時,客戶端會和下一個Redis實例繼續請求加鎖。一般需要將加鎖操作的超時時間設定為鎖的有效時間的一小部分,通常約為幾十毫秒。
第三步是,一旦客戶端完成了和所有Redis實例的加鎖操作,客戶端就要計算整個加鎖過程的總耗時。
客戶端只有在滿足兩個條件時,才能認為是加鎖成功,條件一是客戶端從超過半數(大於等於N/2 1)的Redis實例上成功獲取到了鎖;條件二是客戶端取得鎖的總耗時沒有超過鎖的有效時間。
為何只有在大多數實例加鎖成功時才能算操作成功?事實上,多個Redis實例一起使用組成了一個分散式系統。在分散式系統中總是會出現異常節點,所以在談論分散式系統時,需要考慮異常節點達到多少個,也依舊不影響整個系統的正確運作。這是一個分散式系統的容錯問題,這個問題的結論是:如果只存在故障節點,只要大多數節點正常,那麼整個系統依舊可以提供正確服務。
在滿足了這兩個條件後,我們需要重新計算這把鎖的有效時間,計算的結果是鎖的最初有效時間減去客戶端為取得鎖的總耗時。如果鎖的有效時間已經來不及完成共享資料的操作了,我們可以釋放鎖,以免出現還沒完成共享資源操作,鎖就過期了的情況
。
当然,如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端就要向所有Redis节点发起释放锁的操作
。为什么释放锁,要操作所有的节点呢,不能只操作那些加锁成功的节点吗?因为在某一个Redis节点加锁时,可能因为网络原因导致加锁失败,例如一个客户端在一个Redis实例上加锁成功,但在读取响应结果时由于网络问题导致读取失败,那这把锁其实已经在Redis上加锁成功了。所以释放锁时,不管之前有没有加锁成功,需要释放所有节点上的锁以保证清理节点上的残留的锁
。
在Redlock算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua脚本就可以了。如果N个Redis实例中超过一半的实例正常工作,就能确保分布式锁正常运作。为了提高分布式锁的可靠性,您可以在实际业务应用中使用Redlock算法。
<!-- springboot整合redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
package com.example.redisdemo.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @description: Redis配置类 * @author Keson * @date 21:20 2022/11/14 * @Param * @return * @version 1.0 */ @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { // 设置序列化 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer);// key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化 redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
package com.example.redisdemo.service; import com.example.redisdemo.entity.CustomerBalance; import java.util.concurrent.Callable; /** * @author Keson * @version 1.0 * @description: TODO * @date 2022/11/14 15:12 */ public interface RedisService { <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception; }
package com.example.redisdemo.service.impl; import com.example.redisdemo.entity.CustomerBalance; import com.example.redisdemo.service.RedisService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.ReturnType; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.types.Expiration; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * @author Keson * @version 1.0 * @description: TODO Redis实现分布式锁 * @date 2022/11/14 15:13 */ @Service @Slf4j public class RedisServiceImpl implements RedisService { //设置默认过期时间 private final static int DEFAULT_LOCK_EXPIRY_TIME = 20; //自定义lock key前缀 private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE"; @Autowired private RedisTemplate redisTemplate; @Override public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{ //自定义lock key String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode()); //将UUID当做value,确保唯一性 String lockReference = UUID.randomUUID().toString(); try { if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) { throw new Exception("lock加锁失败"); } return callable.call(); } finally { unlock(lockKey, lockReference); } } //定义lock key String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) { return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode); } //redis加锁 private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) { Boolean locked; try { //SET_IF_ABSENT --> NX: Only set the key if it does not already exist. //SET_IF_PRESENT --> XX: Only set the key if it already exist. locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT)); } catch (Exception e) { log.error("Lock failed for redis key: {}, value: {}", key, value); locked = false; } return locked != null && locked; } //redis解锁 private boolean unlock(String key, String value) { try { //使用lua脚本保证删除的原子性,确保解锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] " + "then return redis.call('del', KEYS[1]) " + "else return 0 end"; Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8))); return unlockState == null || !unlockState; } catch (Exception e) { log.error("unLock failed for redis key: {}, value: {}", key, value); return false; } } }
@Override public int updateById(CustomerBalance customerBalance) throws Exception { return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance)); }
以上是怎麼在SpringBoot中使用Redis實現分散式鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!