>  기사  >  데이터 베이스  >  Redis 분산 잠금 구현 방법 소개

Redis 분산 잠금 구현 방법 소개

尚
앞으로
2019-12-05 17:09:383550검색

Redis 분산 잠금 구현 방법 소개

1. 분산 잠금을 사용하려면 몇 가지 조건을 충족해야 합니다.

1. 시스템은 분산 시스템입니다(키가 분산되어 있으며, ReentrantLock 또는 동기화된 코드 블록을 사용하여 독립 실행형을 구현할 수 있음)

2 , 공유 리소스(각 시스템은 동일한 리소스에 액세스하며 리소스의 전달자는 기존 관계형 데이터베이스 또는 NoSQL일 수 있음)

3. 동기 액세스(즉, 동일한 공유 리소스에 공동 액세스하는 많은 프로세스가 있음) 액세스, 누가 리소스에 관심이 있는지 경쟁 여부)

2. 애플리케이션 시나리오 예

관리 백엔드의 배포 아키텍처(여러 Tomcat 서버 + redis [여러 Tomcat 서버가 하나의 Redis에 액세스] + mysql [여러 Tomcat 서버가 하나에 액세스) server] mysql])은 분산 잠금 사용 조건을 만족합니다. 여러 서버가 Redis 글로벌 캐시의 리소스에 액세스해야 합니다. 분산 잠금을 사용하지 않으면 문제가 발생합니다. 다음 의사 코드를 살펴보세요.

long N=0L;
//N从redis获取值
if(N<5){
N++;
//N写回redis
}

위 코드의 주요 기능은 다음과 같습니다.

redis에서 N 값을 가져오고, N 값에 대한 경계 검사를 수행하고, 이 값을 1씩 증가시킨 다음 N을 다시 redis에 씁니다. 이 애플리케이션 시나리오는 플래시 세일, 글로벌 증분 ID, IP 액세스 제한 등과 같이 매우 일반적입니다.

IP 액세스 제한 측면에서 악의적인 공격자는 무제한 액세스를 시작할 수 있으며 동시성 양은 상대적으로 큽니다. Redis에서 읽은 N은 이미 더티 데이터일 수 있기 때문에 N의 경계 검사는 분산 환경에서 신뢰할 수 없습니다.

기존 잠금 방법(예: Java의 동기화 및 잠금)은 분산 환경이므로 쓸모가 없으며, 이 동기화 문제에 맞서 싸우는 소방관은 무력합니다. 삶과 죽음의 중요한 시기에 마침내 분산 잠금 장치가 작동하게 됩니다.

분산 잠금은 사육사, 재배포 등 다양한 방법으로 구현할 수 있습니다. 어느 쪽이든 기본 원칙은 동일합니다. 상태 값은 잠금을 나타내는 데 사용되며 잠금의 점유 및 해제는 상태 값으로 식별됩니다.

여기에서는 Redis를 사용하여 분산 잠금을 구현하는 방법에 대해 주로 설명합니다.

3. Redis의 setNX 명령을 사용하여 분산 잠금을 구현합니다

1. 구현 원리

Redis는 대기열 모드를 사용하여 동시 액세스를 직렬 액세스로 전환하는 단일 프로세스 단일 스레드 모드입니다. Redis에 액세스하세요. 연결에 대한 경쟁이 없습니다. Redis의 SETNX 명령은 분산 잠금을 쉽게 구현할 수 있습니다.

2. 기본 명령 분석

1) setNX(존재하지 않는 경우 SET)

구문: ​​

SETNX key value

키가 존재하지 않는 경우에만 키 값을 value로 설정합니다.

주어진 키가 이미 존재하는 경우 SETNX는 아무런 조치도 취하지 않습니다.

SETNX는 "SET if Not eXists"의 약자입니다(존재하지 않으면 SET)

반환값:

설정에 성공하면 1을 반환합니다.

설정하지 못했습니다. 0을 반환합니다.

예:

redis> EXISTS job                # job 不存在
(integer) 0

redis> SETNX job "programmer"    # job 设置成功
(integer) 1

redis> SETNX job "code-farmer"   # 尝试覆盖 job ,失败
(integer) 0

redis> GET job                   # 没有被覆盖
"programmer"

따라서 다음 명령을 실행합니다.

SETNX lock.foo <current Unix time + lock timeout + 1>

1이 반환되면 클라이언트는 잠금을 획득하고 lock.foo의 키 값을 클라이언트가 잠겨 있음을 나타내는 시간 값으로 설정합니다. 마지막으로 잠금은 DEL lock.foo를 통해 해제될 수 있습니다.

0이 반환되면 다른 클라이언트가 잠금을 획득했다는 의미입니다. 이때 먼저 반환하거나 다시 시도하고 상대방이 완료될 때까지 기다리거나 잠금 시간이 초과될 때까지 기다릴 수 있습니다.

2) getSET

구문:

GETSET key value

주어진 키의 값을 value로 설정하고 키의 이전 값을 반환합니다.

키가 존재하지만 문자열 유형이 아닌 경우 오류가 반환됩니다.

반환 값:

주어진 키의 이전 값을 반환합니다.

key에 이전 값이 없는 경우, 즉 key가 존재하지 않는 경우 nil이 반환됩니다.

3) get

구문:

GET key

반환값:

키가 없으면 nil을 반환하고, 없으면 키의 값을 반환합니다.

키가 문자열 유형이 아니면 오류가 반환됩니다

4. 교착 상태를 해결합니다.

위 잠금 논리에 문제가 있습니다. 잠금을 보유한 클라이언트가 실패하거나 충돌하여 해제할 수 없는 경우 잠금?

락 키에 해당하는 타임스탬프를 통해 이러한 일이 발생했는지 판단할 수 있습니다. 현재 시간이 lock.foo의 값보다 크다면 잠금이 만료되어 재사용이 가능하다는 의미입니다.

이런 경우 단순히 DEL을 통해 잠금을 삭제한 다음 다시 SETNX를 통해 잠금을 삭제할 수는 없습니다. (합리적으로 잠금 삭제 작업은 잠금 소유자가 수행해야 합니다. 여기서는 시간이 지날 때까지 기다리면 됩니다. out) 여러 클라이언트가 시간 초과를 감지한 후 잠금을 해제하려고 하면 여기에서 경쟁 조건이 발생할 수 있습니다. 이 시나리오를 시뮬레이션해 보겠습니다.

C0 작업이 시간 초과되었지만 여전히 잠금이 유지됩니다. C2. lock.foo를 읽고 타임스탬프를 확인한 후 시간이 초과되었는지 확인하세요.
C1은 DEL lock.foo를 보냅니다.
C1은 SETNX lock.foo를 보내고 성공합니다.
C2는 DEL lock.foo를 보냅니다.
C2는 SETNX lock.foo를 보내고 성공합니다.
이렇게 하면 C1과 C2 모두 잠금을 얻게 됩니다! 큰 문제!

다행히도 이 문제는 C3 클라이언트가 어떻게 수행하는지 살펴보겠습니다.

C3은 잠금을 얻기 위해 SETNX lock.foo를 보냅니다. Redis는 C3에게 0을 반환합니다.

C3发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。 

反之,如果已超时,C3通过下面的操作来尝试获得锁: 

GETSET lock.foo  

通过GETSET,C3拿到的时间戳如果仍然是超时的,那就说明,C3如愿以偿拿到锁了。 

如果在C3之前,有个叫C4的客户端比C3快一步执行了上面的操作,那么C3拿到的时间戳是个未超时的值,这时,C3没有如期获得锁,需要再次等待或重试。留意一下,尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。

注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。  

五、代码实现

expireMsecs 锁持有超时,防止线程在入锁以后,无限的执行下去,让锁无法释放 
timeoutMsecs 锁等待超时,防止线程饥饿,永远没有入锁执行代码的机会 

注意:项目里面需要先搭建好redis的相关配置

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * Redis distributed lock implementation.
 *
 * @author zhengcanrui
 */
public class RedisLock {

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

    private RedisTemplate redisTemplate;

    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;

    /**
     * Lock key path.
     */
    private String lockKey;

    /**
     * 锁超时时间,防止线程在入锁以后,无限的执行等待
     */
    private int expireMsecs = 60 * 1000;

    /**
     * 锁等待时间,防止线程饥饿
     */
    private int timeoutMsecs = 10 * 1000;

    private volatile boolean locked = false;

    /**
     * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs.
     *
     * @param lockKey lock key (ex. account:1, ...)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * Detailed constructor with default lock expiration of 60000 msecs.
     *
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) {
        this(redisTemplate, lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }

    /**
     * Detailed constructor.
     *
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) {
        this(redisTemplate, lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }

    /**
     * @return lock key
     */
    public String getLockKey() {
        return lockKey;
    }

    private String get(final String key) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] data = connection.get(serializer.serialize(key));
                    connection.close();
                    if (data == null) {
                        return null;
                    }
                    return serializer.deserialize(data);
                }
            });
        } catch (Exception e) {
            logger.error("get redis error, key : {}", key);
        }
        return obj != null ? obj.toString() : null;
    }

    private boolean setNX(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return success;
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (Boolean) obj : false;
    }

    private String getSet(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return serializer.deserialize(ret);
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (String) obj : null;
    }

    /**
     * 获得 lock.
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            if (this.setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = this.get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = this.getSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置现在的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受

                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延迟100 毫秒,  这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                使用随机的等待时间可以一定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
        return false;
    }


    /**
     * Acqurired lock release.
     */
    public synchronized void unlock() {
        if (locked) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

}

调用:

RedisLock lock = new RedisLock(redisTemplate, key, 10000, 20000);
 try {
            if(lock.lock()) {
                   //需要加锁的代码
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,
            //操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。 ————这里没有做
            lock.unlock();
        }

六、一些问题

1、为什么不直接使用expire设置超时时间,而将时间的毫秒数其作为value放在redis中?

如下面的方式,把超时的交给redis处理:

lock(key, expireSec){
isSuccess = setnx key
if (isSuccess)
expire key expireSec
}

这种方式貌似没什么问题,但是假如在setnx后,redis崩溃了,expire就没有执行,结果就是死锁了。锁永远不会超时。

 2、为什么前面的锁已经超时了,还要用getSet去设置新的时间戳的时间获取旧的值,然后和外面的判断超时时间的时间戳比较呢?

Redis 분산 잠금 구현 방법 소개

因为是分布式的环境下,可以在前一个锁失效的时候,有两个进程进入到锁超时的判断。如:

C0超时了,还持有锁,C1/C2同时请求进入了方法里面

C1/C2获取到了C0的超时时间

C1使用getSet方法

C2也执行了getSet方法

假如我们不加 oldValueStr.equals(currentValueStr) 的判断,将会C1/C2都将获得锁,加了之后,能保证C1和C2只能一个能获得锁,一个只能继续等待。

注意:这里可能导致超时时间不是其原本的超时时间,C1的超时时间可能被C2覆盖了,但是他们相差的毫秒及其小,这里忽略了。

更多redis知识请关注redis入门教程栏目。

위 내용은 Redis 분산 잠금 구현 방법 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 cnblogs.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제