搜尋
首頁資料庫RedisRedis分散式鎖定實例分析

分散式鎖定概覽

在多執行緒的環境下,為了保證一個程式碼區塊在同一時間只能由一個執行緒訪問,Java中我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式。但現在公司都是流行分散式架構,在分散式環境下,如何確保不同節點的執行緒同步執行呢?因此就引出了分散式鎖,它是控制分散式系統之間互斥存取共享資源的一種方式。

在一個分散式系統中,多台機器上部署了多個服務,當客戶端一個使用者發起一個資料插入請求時,如果沒有分散式鎖定機制保證,那麼多台機器上的多個服務可能會進行並發插入操作,導致資料重複插入,對於某些不允許有多餘資料的業務來說,這就會造成問題。而分散式鎖機制就是為了解決類似這類問題,確保多個服務之間互斥的存取共享資源,如果一個服務搶佔了分散式鎖,其他服務沒取得到鎖,就不進行後續操作。大致意思如下圖:

Redis分散式鎖定實例分析

分散式鎖定的特性

分散式鎖定一般有如下的特點:

  • #互斥性: 同一時刻只能有一個執行緒持有鎖定

  • 可重入性: 同一節點上的同一個執行緒如果取得了鎖之後能夠再次取得鎖定

  • 鎖定逾時:和J.U.C中的鎖定一樣支援鎖定逾時,防止死鎖定

  • 高效能和高可用: 加鎖和解鎖需要高效,同時也需要確保高可用,防止分散式鎖定失效

  • 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒

#分散式鎖定的實作方式

我們一般實作分散式鎖定有以下幾種方式:

  • 基於資料庫

  • #基於Redis

  • 基於zookeeper

#Redis普通分散式鎖定存在的問題

說到Redis分散式鎖定,大部分人都會想到:setnx lua(redis保證執行lua腳本時不執行其他操作,保證操作的原子性),或知道set key value px milliseconds nx。後者方式的核心實作指令如下:

- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then 
    return redis.call("del",KEYS[1])
else   
    return 0
end

這種實作方式有3個重點(也是面試機率非常高的地方):

  1. set指令要用set key value px milliseconds nx;

  2. value要具有唯一性;

  3. 釋放鎖定時要驗證value值,不能誤解鎖;

事實上這類鎖最大的缺點就是它加鎖時只作用在一個Redis節點上,即使Redis透過sentinel保證高可用,如果這個master節點因為某些原因發生了主從切換,那麼就會出現鎖丟失的情況:

  1. 在Redis的master節點上拿到了鎖;

  2. 但是這個加鎖的key還沒有同步到slave節點;

  3. master故障,發生故障轉移,slave節點升級到master節點;

  4. 導致鎖定遺失。

為了避免單點故障問題,Redis作者antirez基於分散式環境下提出了更進階的分散式鎖定的實作方式:Redlock。 Redlock也是Redis所有分散式鎖定實現方式中唯一能讓面試官高潮的方式。

Redis高階分散式鎖定:Redlock

antirez提出的redlock演算法大概是這樣的:

在Redis的分散式環境中,我們假設有N個Redis master 。這些節點完全互相獨立,不存在主從複製或其他群集協調機制。我們確保將在N個實例上使用與在Redis單一實例下相同方法取得和釋放鎖定。現在我們假設有5個Redis master節點,同時我們需要在5台伺服器上面執行這些Redis實例,這樣確保他們不會同時都宕掉。

為了取到鎖,客戶端應該執行以下操作:

  • 取得目前Unix時間,以毫秒為單位。

  • 依序嘗試從5個實例,使用相同的key和具有唯一性的value(例如UUID)取得鎖定。當向Redis請求取得鎖定時,客戶端應該設定一個網路連線和回應逾時時間,而這個逾時時間應該小於鎖定的失效時間。例如你的鎖定自動失效時間TTL為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免伺服器端Redis已經掛掉的情況下,客戶端還在死死地等待回應結果。如果伺服器端沒有在規定時間內回應,客戶端應該盡快嘗試去另一個Redis實例請求取得鎖定。

  • 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功

  • 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。

  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

  • 此处不讨论时钟漂移

Redis分散式鎖定實例分析

Redlock源码

redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。

1. Redlock依赖

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.3.2</version>
</dependency>

2. Redlock用法

首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
        .setMasterName("masterName")
        .setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 还可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
    isLock = redLock.tryLock();
    // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
    isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    if (isLock) {
        //TODO if get lock success, do something;
    }
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁
    redLock.unlock();
}

3. Redlock唯一ID

实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:

protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
    return id + ":" + threadId;
}

4. Redlock获取锁

获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 获取锁时向5个redis实例发送的命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
              "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                  "redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
              "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 获取分布式锁的KEY的失效时间毫秒数
              "return redis.call(&#39;pttl&#39;, KEYS[1]);",
              // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

获取锁的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;

  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;

  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:

5. Redlock释放锁

释放锁的代码为redLock.unlock(),核心源码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 向5个redis实例都执行如下命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁KEY不存在,那么向channel发布一条消息
            "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call(&#39;del&#39;, KEYS[1]); " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

Redis实现的分布式锁轮子

下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。

1. 自定义注解

自定义一个注解,被注解的方法会执行获取分布式锁的逻辑

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
    /**
     * 业务键
     *
     * @return
     */
    String key();
    /**
     * 锁的过期秒数,默认是5秒
     *
     * @return
     */
    int expire() default 5;

    /**
     * 尝试加锁,最多等待时间
     *
     * @return
     */
    long waitTime() default Long.MIN_VALUE;
    /**
     * 锁的超时时间单位
     *
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}

2. AOP拦截器实现

在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:

@Aspect
@Component
public class LockMethodAspect {
    @Autowired
    private RedisLockHelper redisLockHelper;
    @Autowired
    private JedisUtil jedisUtil;
    private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);

    @Around("@annotation(com.redis.lock.annotation.RedisLock)")
    public Object around(ProceedingJoinPoint joinPoint) {
        Jedis jedis = jedisUtil.getJedis();
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String value = UUID.randomUUID().toString();
        String key = redisLock.key();
        try {
            final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
            logger.info("isLock : {}",islock);
            if (!islock) {
                logger.error("获取锁失败");
                throw new RuntimeException("获取锁失败");
            }
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                throw new RuntimeException("系统异常");
            }
        }  finally {
            logger.info("释放锁");
            redisLockHelper.unlock(jedis,key, value);
            jedis.close();
        }
    }
}

3. Redis实现分布式锁核心类

@Component
public class RedisLockHelper {
    private long sleepTime = 100;
    /**
     * 直接使用setnx + expire方式获取分布式锁
     * 非原子性
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
        Long result = jedis.setnx(key, value);
        // result = 1时,设置成功,否则设置失败
        if (result == 1L) {
            return jedis.expire(key, timeout) == 1L;
        } else {
            return false;
        }
    }

    /**
     * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
     *
     * @param jedis
     * @param key
     * @param UniqueId
     * @param seconds
     * @return
     */
    public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
        String lua_scripts = "if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 then" +
                "redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) return 1 else return 0 end";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(UniqueId);
        values.add(String.valueOf(seconds));
        Object result = jedis.eval(lua_scripts, keys, values);
        //判断是否成功
        return result.equals(1L);
    }

    /**
     * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /**
     * 自定义获取锁的超时时间
     *
     * @param jedis
     * @param key
     * @param value
     * @param timeout
     * @param waitTime
     * @param timeUnit
     * @return
     * @throws InterruptedException
     */
    public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
        long seconds = timeUnit.toSeconds(timeout);
        while (waitTime >= 0) {
            String result = jedis.set(key, value, "nx", "ex", seconds);
            if ("OK".equals(result)) {
                return true;
            }
            waitTime -= sleepTime;
            Thread.sleep(sleepTime);
        }
        return false;
    }
    /**
     * 错误的解锁方法—直接删除key
     *
     * @param key
     */
    public void unlock_with_del(Jedis jedis,String key) {
        jedis.del(key);
    }

    /**
     * 使用Lua脚本进行解锁操纵,解锁的时候验证value值
     *
     * @param jedis
     * @param key
     * @param value
     * @return
     */
    public boolean unlock(Jedis jedis,String key,String value) {
        String luaScript = "if redis.call(&#39;get&#39;,KEYS[1]) == ARGV[1] then " +
                "return redis.call(&#39;del&#39;,KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
}

4. Controller层控制

定义一个TestController来测试我们实现的分布式锁

@RestController
public class TestController {
    @RedisLock(key = "redis_lock")
    @GetMapping("/index")
    public String index() {
        return "index";
    }
}

以上是Redis分散式鎖定實例分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:亿速云。如有侵權,請聯絡admin@php.cn刪除
REDIS:它如何充當數據存儲和服務REDIS:它如何充當數據存儲和服務Apr 24, 2025 am 12:08 AM

REDISACTSASBOTHADATASTOREANDASERVICE.1)ASADATASTORE,ITUSESIN-MEMORYSTOOGATOFORFOFFASTESITION,支持VariousDatharptructuresLikeKey-valuepairsandsortedsetsetsetsetsetsetsets.2)asaservice,ItprovidespunctionslikeItionitionslikepunikeLikePublikePublikePlikePlikePlikeAndluikeAndluAascriptingiationsmpleplepleclexplectiations

REDIS與其他數據庫:比較分析REDIS與其他數據庫:比較分析Apr 23, 2025 am 12:16 AM

Redis與其他數據庫相比,具有以下獨特優勢:1)速度極快,讀寫操作通常在微秒級別;2)支持豐富的數據結構和操作;3)靈活的使用場景,如緩存、計數器和發布訂閱。選擇Redis還是其他數據庫需根據具體需求和場景,Redis在高性能、低延遲應用中表現出色。

REDIS的角色:探索數據存儲和管理功能REDIS的角色:探索數據存儲和管理功能Apr 22, 2025 am 12:10 AM

Redis在數據存儲和管理中扮演著關鍵角色,通過其多種數據結構和持久化機製成為現代應用的核心。 1)Redis支持字符串、列表、集合、有序集合和哈希表等數據結構,適用於緩存和復雜業務邏輯。 2)通過RDB和AOF兩種持久化方式,Redis確保數據的可靠存儲和快速恢復。

REDIS:了解NOSQL概念REDIS:了解NOSQL概念Apr 21, 2025 am 12:04 AM

Redis是一種NoSQL數據庫,適用於大規模數據的高效存儲和訪問。 1.Redis是開源的內存數據結構存儲系統,支持多種數據結構。 2.它提供極快的讀寫速度,適合緩存、會話管理等。 3.Redis支持持久化,通過RDB和AOF方式確保數據安全。 4.使用示例包括基本的鍵值對操作和高級的集合去重功能。 5.常見錯誤包括連接問題、數據類型不匹配和內存溢出,需注意調試。 6.性能優化建議包括選擇合適的數據結構和設置內存淘汰策略。

REDIS:現實世界的用例和示例REDIS:現實世界的用例和示例Apr 20, 2025 am 12:06 AM

Redis在現實世界中的應用包括:1.作為緩存系統加速數據庫查詢,2.存儲Web應用的會話數據,3.實現實時排行榜,4.作為消息隊列簡化消息傳遞。 Redis的多功能性和高性能使其在這些場景中大放異彩。

REDIS:探索其功能和功能REDIS:探索其功能和功能Apr 19, 2025 am 12:04 AM

Redis脫穎而出是因為其高速、多功能性和豐富的數據結構。 1)Redis支持字符串、列表、集合、散列和有序集合等數據結構。 2)它通過內存存儲數據,支持RDB和AOF持久化。 3)從Redis6.0開始引入多線程處理I/O操作,提升了高並發場景下的性能。

Redis是SQL還是NOSQL數據庫?答案解釋了Redis是SQL還是NOSQL數據庫?答案解釋了Apr 18, 2025 am 12:11 AM

RedisisclassifiedasaNoSQLdatabasebecauseitusesakey-valuedatamodelinsteadofthetraditionalrelationaldatabasemodel.Itoffersspeedandflexibility,makingitidealforreal-timeapplicationsandcaching,butitmaynotbesuitableforscenariosrequiringstrictdataintegrityo

REDIS:提高應用程序性能和可擴展性REDIS:提高應用程序性能和可擴展性Apr 17, 2025 am 12:16 AM

Redis通過緩存數據、實現分佈式鎖和數據持久化來提升應用性能和可擴展性。 1)緩存數據:使用Redis緩存頻繁訪問的數據,提高數據訪問速度。 2)分佈式鎖:利用Redis實現分佈式鎖,確保在分佈式環境中操作的安全性。 3)數據持久化:通過RDB和AOF機制保證數據安全性,防止數據丟失。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。

Atom編輯器mac版下載

Atom編輯器mac版下載

最受歡迎的的開源編輯器

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

將Eclipse與SAP NetWeaver應用伺服器整合。

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。