搜索
首页数据库RedisRedis分布式锁实例分析

分布式锁概览

在多线程的环境下,为了保证一个代码块在同一时间只能由一个线程访问,Java中我们一般可以使用synchronized语法和ReetrantLock去保证,这实际上是本地锁的方式。但是现在公司都是流行分布式架构,在分布式环境下,如何保证不同节点的线程同步执行呢?因此就引出了分布式锁,它是控制分布式系统之间互斥访问共享资源的一种方式。

在一个分布式系统中,多台机器上部署了多个服务,当客户端一个用户发起一个数据插入请求时,如果没有分布式锁机制保证,那么那多台机器上的多个服务可能进行并发插入操作,导致数据重复插入,对于某些不允许有多余数据的业务来说,这就会造成问题。而分布式锁机制就是为了解决类似这类问题,保证多个服务之间互斥的访问共享资源,如果一个服务抢占了分布式锁,其他服务没获取到锁,就不进行后续操作。大致意思如下图所示:

Redis分布式锁实例分析

分布式锁的特点

分布式锁一般有如下的特点:

  • 互斥性: 同一时刻只能有一个线程持有锁

  • 可重入性: 同一节点上的同一个线程如果获取了锁之后能够再次获取锁

  • 锁超时:和J.U.C中的锁一样支持锁超时,防止死锁

  • 高性能和高可用: 加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效

  • 具备阻塞和非阻塞性:能够及时从阻塞状态中被唤醒

分布式锁的实现方式

我们一般实现分布式锁有以下几种方式:

  • 基于数据库

  • 基于Redis

  • 基于zookeeper

Redis普通分布式锁存在的问题

说到Redis分布式锁,大部分人都会想到:setnx lua(redis保证执行lua脚本时不执行其他操作,保证操作的原子性),或者知道set key value px milliseconds nx。后一种方式的核心实现命令如下:

- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then 
    return redis.call("del",KEYS[1])
else   
    return 0
end

这种实现方式有3大要点(也是面试概率非常高的地方):

  1. set命令要用set key value px milliseconds nx

  2. value要具有唯一性;

  3. 释放锁时要验证value值,不能误解锁;

事实上这类锁最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

  1. 在Redis的master节点上拿到了锁;

  2. 但是这个加锁的key还没有同步到slave节点;

  3. master故障,发生故障转移,slave节点升级为master节点;

  4. 导致锁丢失。

为了避免单点故障问题,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。Redlock也是Redis所有分布式锁实现方式中唯一能让面试官高潮的方式。

Redis高级分布式锁:Redlock

antirez提出的redlock算法大概是这样的:

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

  • 获取当前Unix时间,以毫秒为单位。

  • 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间TTL为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。

  • 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功

  • 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。

  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

  • 此处不讨论时钟漂移

Redis分布式锁实例分析

Redlock源码

redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。

1. Redlock依赖

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.3.2</version>
</dependency>

2. Redlock用法

首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
        .setMasterName("masterName")
        .setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 还可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
    isLock = redLock.tryLock();
    // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
    isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    if (isLock) {
        //TODO if get lock success, do something;
    }
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁
    redLock.unlock();
}

3. Redlock唯一ID

实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:

protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
    return id + ":" + threadId;
}

4. Redlock获取锁

获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 获取锁时向5个redis实例发送的命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
              "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                  "redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
              "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 获取分布式锁的KEY的失效时间毫秒数
              "return redis.call(&#39;pttl&#39;, KEYS[1]);",
              // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

获取锁的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;

  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;

  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:

5. Redlock释放锁

释放锁的代码为redLock.unlock(),核心源码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 向5个redis实例都执行如下命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁KEY不存在,那么向channel发布一条消息
            "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call(&#39;del&#39;, KEYS[1]); " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

Redis实现的分布式锁轮子

下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。

1. 自定义注解

自定义一个注解,被注解的方法会执行获取分布式锁的逻辑

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
    /**
     * 业务键
     *
     * @return
     */
    String key();
    /**
     * 锁的过期秒数,默认是5秒
     *
     * @return
     */
    int expire() default 5;

    /**
     * 尝试加锁,最多等待时间
     *
     * @return
     */
    long waitTime() default Long.MIN_VALUE;
    /**
     * 锁的超时时间单位
     *
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}

2. AOP拦截器实现

在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:

@Aspect
@Component
public class LockMethodAspect {
    @Autowired
    private RedisLockHelper redisLockHelper;
    @Autowired
    private JedisUtil jedisUtil;
    private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);

    @Around("@annotation(com.redis.lock.annotation.RedisLock)")
    public Object around(ProceedingJoinPoint joinPoint) {
        Jedis jedis = jedisUtil.getJedis();
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String value = UUID.randomUUID().toString();
        String key = redisLock.key();
        try {
            final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
            logger.info("isLock : {}",islock);
            if (!islock) {
                logger.error("获取锁失败");
                throw new RuntimeException("获取锁失败");
            }
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                throw new RuntimeException("系统异常");
            }
        }  finally {
            logger.info("释放锁");
            redisLockHelper.unlock(jedis,key, value);
            jedis.close();
        }
    }
}

3. Redis实现分布式锁核心类

@Component
public class RedisLockHelper {
    private long sleepTime = 100;
    /**
     * 直接使用setnx + expire方式获取分布式锁
     * 非原子性
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
        Long result = jedis.setnx(key, value);
        // result = 1时,设置成功,否则设置失败
        if (result == 1L) {
            return jedis.expire(key, timeout) == 1L;
        } else {
            return false;
        }
    }

    /**
     * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
     *
     * @param jedis
     * @param key
     * @param UniqueId
     * @param seconds
     * @return
     */
    public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
        String lua_scripts = "if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 then" +
                "redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) return 1 else return 0 end";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(UniqueId);
        values.add(String.valueOf(seconds));
        Object result = jedis.eval(lua_scripts, keys, values);
        //判断是否成功
        return result.equals(1L);
    }

    /**
     * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /**
     * 自定义获取锁的超时时间
     *
     * @param jedis
     * @param key
     * @param value
     * @param timeout
     * @param waitTime
     * @param timeUnit
     * @return
     * @throws InterruptedException
     */
    public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
        long seconds = timeUnit.toSeconds(timeout);
        while (waitTime >= 0) {
            String result = jedis.set(key, value, "nx", "ex", seconds);
            if ("OK".equals(result)) {
                return true;
            }
            waitTime -= sleepTime;
            Thread.sleep(sleepTime);
        }
        return false;
    }
    /**
     * 错误的解锁方法—直接删除key
     *
     * @param key
     */
    public void unlock_with_del(Jedis jedis,String key) {
        jedis.del(key);
    }

    /**
     * 使用Lua脚本进行解锁操纵,解锁的时候验证value值
     *
     * @param jedis
     * @param key
     * @param value
     * @return
     */
    public boolean unlock(Jedis jedis,String key,String value) {
        String luaScript = "if redis.call(&#39;get&#39;,KEYS[1]) == ARGV[1] then " +
                "return redis.call(&#39;del&#39;,KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
}

4. Controller层控制

定义一个TestController来测试我们实现的分布式锁

@RestController
public class TestController {
    @RedisLock(key = "redis_lock")
    @GetMapping("/index")
    public String index() {
        return "index";
    }
}

以上是Redis分布式锁实例分析的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:亿速云。如有侵权,请联系admin@php.cn删除
REDIS:它如何充当数据存储和服务REDIS:它如何充当数据存储和服务Apr 24, 2025 am 12:08 AM

REDISACTSASBOTHADATASTOREANDASERVICE.1)ASADATASTORE,ITUSESIN-MEMORYSTOOGATOFORFOFFASTESITION,支持VariousDatharptructuresLikeKey-valuepairsandsortedsetsetsetsetsetsetsets.2)asaservice,ItprovidespunctionslikeItionitionslikepunikeLikePublikePublikePlikePlikePlikeAndluikeAndluAascriptingiationsmpleplepleclexplectiations

REDIS与其他数据库:比较分析REDIS与其他数据库:比较分析Apr 23, 2025 am 12:16 AM

Redis与其他数据库相比,具有以下独特优势:1)速度极快,读写操作通常在微秒级别;2)支持丰富的数据结构和操作;3)灵活的使用场景,如缓存、计数器和发布订阅。选择Redis还是其他数据库需根据具体需求和场景,Redis在高性能、低延迟应用中表现出色。

REDIS的角色:探索数据存储和管理功能REDIS的角色:探索数据存储和管理功能Apr 22, 2025 am 12:10 AM

Redis在数据存储和管理中扮演着关键角色,通过其多种数据结构和持久化机制成为现代应用的核心。1)Redis支持字符串、列表、集合、有序集合和哈希表等数据结构,适用于缓存和复杂业务逻辑。2)通过RDB和AOF两种持久化方式,Redis确保数据的可靠存储和快速恢复。

REDIS:了解NOSQL概念REDIS:了解NOSQL概念Apr 21, 2025 am 12:04 AM

Redis是一种NoSQL数据库,适用于大规模数据的高效存储和访问。1.Redis是开源的内存数据结构存储系统,支持多种数据结构。2.它提供极快的读写速度,适合缓存、会话管理等。3.Redis支持持久化,通过RDB和AOF方式确保数据安全。4.使用示例包括基本的键值对操作和高级的集合去重功能。5.常见错误包括连接问题、数据类型不匹配和内存溢出,需注意调试。6.性能优化建议包括选择合适的数据结构和设置内存淘汰策略。

REDIS:现实世界的用例和示例REDIS:现实世界的用例和示例Apr 20, 2025 am 12:06 AM

Redis在现实世界中的应用包括:1.作为缓存系统加速数据库查询,2.存储Web应用的会话数据,3.实现实时排行榜,4.作为消息队列简化消息传递。Redis的多功能性和高性能使其在这些场景中大放异彩。

REDIS:探索其功能和功能REDIS:探索其功能和功能Apr 19, 2025 am 12:04 AM

Redis脱颖而出是因为其高速、多功能性和丰富的数据结构。1)Redis支持字符串、列表、集合、散列和有序集合等数据结构。2)它通过内存存储数据,支持RDB和AOF持久化。3)从Redis6.0开始引入多线程处理I/O操作,提升了高并发场景下的性能。

Redis是SQL还是NOSQL数据库?答案解释了Redis是SQL还是NOSQL数据库?答案解释了Apr 18, 2025 am 12:11 AM

RedisisclassifiedasaNoSQLdatabasebecauseitusesakey-valuedatamodelinsteadofthetraditionalrelationaldatabasemodel.Itoffersspeedandflexibility,makingitidealforreal-timeapplicationsandcaching,butitmaynotbesuitableforscenariosrequiringstrictdataintegrityo

REDIS:提高应用程序性能和可扩展性REDIS:提高应用程序性能和可扩展性Apr 17, 2025 am 12:16 AM

Redis通过缓存数据、实现分布式锁和数据持久化来提升应用性能和可扩展性。1)缓存数据:使用Redis缓存频繁访问的数据,提高数据访问速度。2)分布式锁:利用Redis实现分布式锁,确保在分布式环境中操作的安全性。3)数据持久化:通过RDB和AOF机制保证数据安全性,防止数据丢失。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

EditPlus 中文破解版

EditPlus 中文破解版

体积小,语法高亮,不支持代码提示功能

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

安全考试浏览器

安全考试浏览器

Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)