首頁  >  文章  >  Java  >  分散式鎖:5個案例,從入門到入土

分散式鎖:5個案例,從入門到入土

Java后端技术全栈
Java后端技术全栈轉載
2023-08-24 14:48:03942瀏覽

今天要跟大家分享的是分散式鎖定#,本文使用五個案例、圖、原始碼分析等來分析。

常見的synchronized、Lock等這些鎖都是基於單一JVM的實作的,如果分散式場景下怎麼辦呢?這時候分散式鎖就出現了。

關於分散式的實作方案,在業界流行的有三種:

1、基於資料庫

2、基於Redis

3、基於Zookeeper

另外,還有使用etcdconsul來實現的。

在開發中使用最多的是RedisZookeeper兩個方案,而兩個方案中最複雜的,最容易出問題的就是Redis的實作方案,所以,我們今天就來把Redis實作方案都聊聊。

本文主要內容

分散式鎖:5個案例,從入門到入土


分散式鎖定場景

估計部分朋友還不太清楚分散式的使用場景,下面我簡單羅列三種:分散式鎖:5個案例,從入門到入土


#「案例1

## #####如下程式碼模擬了下單減庫存的場景,我們分析下在高並發場景下會存在什麼問題###
@RestController
public class IndexController {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 模拟下单减库存的场景
     * @return
     */
    @RequestMapping(value = "/duduct_stock")
    public String deductStock(){
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
        return "end";
    }
}
###假設在###Redis###中庫存(stock)初始值是100。 ######現在有5個客戶端同時請求該接口,可能就會存在同時執行###
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

这行代码,获取到的值都为100,紧跟着判断大于0后都进行-1操作,最后设置到redis 中的值都为99。但正常执行完成后redis中的值应为 95。

案例2-使用synchronized 实现单机锁

在遇到案例1的问题后,大部分人的第一反应都会想到加锁来控制事务的原子性,如下代码所示:

@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    synchronized (this){
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
    }
    return "end";
}

现在当有多个请求访问该接口时,同一时刻只有一个请求可进入方法体中进行库存的扣减,其余请求等候。

但我们都知道,synchronized 锁是属于JVM级别的,也就是我们俗称的“单机锁”。但现在基本大部分公司使用的都是集群部署,现在我们思考下以上代码在集群部署的情况下还能保证库存数据的一致性吗?

分散式鎖:5個案例,從入門到入土

答案是不能,如上图所示,请求经Nginx分发后,可能存在多个服务同时从Redis中获取库存数据,此时只加synchronized (单机锁)是无效的,并发越高,出现问题的几率就越大。

案例3-使用SETNX实现分布式锁

setnx:将 key 的值设为 value,当且仅当 key 不存在。

若给定 key 已经存在,则 setnx 不做任何动作。

使用setnx实现简单的分布式锁:

/**
 * 模拟下单减库存的场景
 * @return
 */
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";
    // 使用 setnx 添加分布式锁
    // 返回 true 代表之前redis中没有key为 lockKey 的值,并已进行成功设置
    // 返回 false 代表之前redis中已经存在 lockKey 这个key了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
    if(!result){
        // 代表已经加锁了
        return "error_code";
    }

    // 从redis 中拿当前库存的值
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock",realStock + "");
        System.out.println("扣减成功,剩余库存:" + realStock);
    }else{
        System.out.println("扣减失败,库存不足");
    }

    // 释放锁
    stringRedisTemplate.delete(lockKey);
    return "end";
}

我们知道 Redis 是单线程执行,现在再看案例2中的流程图时,哪怕高并发场景下多个请求都执行到了setnx的代码,redis会根据请求的先后顺序进行排列,只有排列在队头的请求才能设置成功。其它请求只能返回“error_code”。

当setnx设置成功后,可执行业务代码对库存扣减,执行完成后对锁进行释放

我们再来思考下以上代码已经完美实现分布式锁了吗?能够支撑高并发场景吗?答案并不是,上面的代码还是存在很多问题的,离真正的分布式锁还差的很远。

我们分析一下,上面的代码存在的问题:

死锁:假如第一个请求在setnx加锁完成后,执行业务代码时出现了异常,那释放锁的代码就无法执行,后面所有的请求也都无法进行操作了。

针对死锁的问题,我们对代码再次进行优化,添加try-finally,在finally中添加释放锁代码,这样无论如何都会执行释放锁代码,如下所示:

/**
     * 模拟下单减库存的场景
     * @return
     */
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";

    try{
        // 使用 setnx 添加分布式锁
        // 返回 true 代表之前redis中没有key为 lockKey 的值,并已进行成功设置
        // 返回 false 代表之前redis中已经存在 lockKey 这个key了
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
        if(!result){
            // 代表已经加锁了
            return "error_code";
        }
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
    }finally {
        // 释放锁
        stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

经过改进后的代码是否还存在问题呢?我们思考正常执行的情况下应该是没有问题,但我们假设请求在执行到业务代码时服务突然宕机了,或者正巧你的运维同事重新发版,粗暴的 kill -9 掉了呢,那代码还能执行 finally 吗?

案例4-加入过期时间

针对想到的问题,对代码再次进行优化,加入过期时间,这样即便出现了上述的问题,在时间到期后锁也会自动释放掉,不会出现“死锁”的情况。

@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";

    try{
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
        if(!result){
            // 代表已经加锁了
            return "error_code";
        }
        // 从redis 中拿当前库存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        }else{
            System.out.println("扣减失败,库存不足");
        }
    }finally {
        // 释放锁
        stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

现在我们再思考一下,给锁加入过期时间后就可以了吗?就可以完美运行不出问题了吗?

超时时间设置的10s真的合适吗?如果不合适设置多少秒合适呢?如下图所示

分散式鎖:5個案例,從入門到入土
圖片

假設同一時間有三個請求。

  • 請求1先加鎖後需執行15秒,但執行到10秒時鎖定失效釋放。
  • 請求2進入後加鎖執行,在請求2執行到5秒時,請求1執行完成進行鎖定釋放,但此時釋放掉的是請求2的鎖定。
  • 請求3在請求2執行5秒時開始執行,但在執行到3秒時請求2執行完成將請求3的鎖定進行釋放。

我們現在只是模擬3個請求便可看出問題,如果在真正高並發的場景下,可能鎖就會面臨「一直失效」或「永久失效」。

那麼具體問題出在哪裡呢?總結為以下幾點:

  • 1.存在請求釋放鎖定時釋放掉的並不是自己的鎖定
  • 2.超時時間過短,存在程式碼未執行完便自動釋放

針對問題我們思考對應的解決方法:

  • 針對問題1,我們想到在請求進入時產生一個唯一id,使用該唯一id作為鎖的value值,釋放時先進行獲取比對,比對相同時再進行釋放,這樣就可以解決釋放掉其它請求鎖的問題。
  • 針對問題2,我們思考不斷的延長過期時間真的適合嗎?設定短了有超時自動釋放的問題,設定長了又會出現宕機後一段時間鎖無法釋放的問題,雖然不會再出現「死鎖」。針對這個問題,如何解決呢?

案例5-Redisson分散式鎖定

Spring Boot整合Redisson步驟

#

引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

初始化客户端

@Bean
public RedissonClient redisson(){
    // 单机模式
    Config config = new Config();
    config.useSingleServer().setAddress("redis://192.168.3.170:6379").setDatabase(0);
    return Redisson.create(config);
}

Redisson实现分布式锁

@RestController
public class IndexController {

    @Autowired
    private RedissonClient redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 模拟下单减库存的场景
     * @return
     */
    @RequestMapping(value = "/duduct_stock")
    public String deductStock(){
        String lockKey = "product_001";
        // 1.获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        try{
            // 2.加锁
            redissonLock.lock();  // 等价于 setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
            // 从redis 中拿当前库存的值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            }else{
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            // 3.释放锁
            redissonLock.unlock();
        }
        return "end";
    }
}

Redisson 分布式锁实现原理图

分散式鎖:5個案例,從入門到入土
图片

Redisson 底层源码分析

我们点击lock()方法,查看源码,最终看到以下代码

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                      "redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1); " +
                      "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); " +
                      "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call(&#39;pttl&#39;, KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

没错,加锁最终执行的就是这段lua 脚本语言。

if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then 
    redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1); 
    redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); 
    return nil; 
end;

脚本的主要逻辑为:

  • exists 判断 key 是否存在
  • 当判断不存在则设置 key
  • 然后给设置的key追加过期时间

这样来看其实和我们前面案例中的实现方法好像没什么区别,但实际上并不是。

这段lua脚本命令在Redis中执行时,会被当成一条命令来执行,能够保证原子性,故要不都成功,要不都失败。

我们在源码中看到Redssion的许多方法实现中很多都用到了lua脚本,这样能够极大的保证命令执行的原子性。

下面是Redisson锁自动“续命”源码:

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {

            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                                                                     "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then " +
                                                                     "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                                                                     "return 1; " +
                                                                     "end; " +
                                                                     "return 0;",
                                                                     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can&#39;t update lock " + getName() + " expiration", future.cause());
                        return;
                    }

                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

这段代码是在加锁后开启一个守护线程进行监听Redisson超时时间默认设置30s,线程每10s调用一次判断锁还是否存在,如果存在则延长锁的超时时间。

现在,我们再回过头来看看案例5中的加锁代码与原理图,其实完善到这种程度已经可以满足很多公司的使用了,并且很多公司也确实是这样用的。但我们再思考下是否还存在问题呢?例如以下场景:

  • 眾所周知Redis在實際部署使用時都是叢集部署的,那在高並發場景下我們加鎖,當把key寫入到master節點後,master還未同步到slave節點時master宕機了,原有的slave節點經過選舉變為了新的master節點,此時可能就會出現鎖定失效問題。
  • 透過分散式鎖定的實作機制我們知道,高並發場景下只有加鎖成功的請求可以繼續處理業務邏輯。那就出現了大夥都來加鎖,但有且僅有一個加鎖成功了,剩餘的都在等待。其實分散式鎖與高並發在語意上就是相違背的,我們的請求雖然都是並發,但Redis幫我們把請求進行了排隊執行,也就是把我們的並行轉為了串行。串行執行的程式碼肯定不存在並發問題了,但是程式的效能肯定也會因此受到影響。

針對這些問題,我們再次思考解決方案

  • 在思考解決方案時我們首先想到CAP原則(一致性、可用性、分區容錯性),那麼現在的Redis就是滿足AP(可用性、分區容錯性),如果想要解決該問題我們就需要尋找滿足CP(一致性、分區容錯性)的分散式系統。首先想到的就是ZookeeperZookeeper的群集間資料同步機制是當主節點接收資料後不會立即回傳給客戶端成功的回饋,它會先與子節點進行數據同步,半數以上的節點都完成同步後才會通知客戶端接收成功。且如果主節點宕機後,根據ZookeeperZab協議(Zookeeper原子廣播)重新選舉的主節點一定是已經同步成功的。

    那麼問題來了,RedissonZookeeper分散式鎖定我們要如何選擇呢?答案是如果並發量沒有那麼高,可以用Zookeeper來做分散式鎖,但是它的並發能力遠遠不如Redis。如果你對並發要求比較高的話,那就用Redis,偶爾出現的主從架構鎖失效的問題其實是可以容忍的。

  • 關於第二個提升效能的問題,我們可以參考ConcurrentHashMap的鎖定分段技術的思想,例如我們程式碼的庫存量目前為1000,那我們可以分為10段,每段100,然後對每段分別加鎖,這樣就可以同時執行10個請求的加鎖與處理,當然有要求的同學還可以繼續細分。但其實RedisQps已經達到10W 了,沒有特別高並發量的場景下也是完全夠用的。

以上是分散式鎖:5個案例,從入門到入土的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:Java后端技术全栈。如有侵權,請聯絡admin@php.cn刪除