Um sicherzustellen, dass in einer Multithread-Umgebung nur ein Thread gleichzeitig auf einen Codeblock zugreifen kann, können wir in Java im Allgemeinen synchronisierte Syntax und ReetrantLock verwenden, um sicherzustellen, dass dies tatsächlich der Fall ist eine lokale Sperrmethode. Aber jetzt übernehmen Unternehmen eine verteilte Architektur. Wie kann sichergestellt werden, dass Threads auf verschiedenen Knoten gleichzeitig ausgeführt werden? Daher werden verteilte Sperren eingeführt, mit denen der gegenseitig ausschließende Zugriff auf gemeinsam genutzte Ressourcen zwischen verteilten Systemen gesteuert werden kann. In einem verteilten System werden mehrere Dienste auf mehreren Computern bereitgestellt. Wenn ein Benutzer auf dem Client eine Dateneinfügungsanforderung initiiert und es keine Garantie für einen verteilten Sperrmechanismus gibt, können mehrere Dienste auf diesen mehreren Computern zu wiederholten Einfügevorgängen führen Das Einfügen von Daten kann für einige Unternehmen, die keine redundanten Daten zulassen, zu Problemen führen. Der verteilte Sperrmechanismus soll Probleme wie dieses lösen und den gegenseitigen Zugriff auf gemeinsam genutzte Ressourcen zwischen mehreren Diensten sicherstellen. Wenn ein Dienst die verteilte Sperre übernimmt und andere Dienste die Sperre nicht erhalten, werden keine weiteren Vorgänge ausgeführt. Die allgemeine Bedeutung ist in der folgenden Abbildung dargestellt:
Eigenschaften verteilter Sperren
Wir implementieren verteilte Sperren im Allgemeinen auf folgende Weise:
Basierend auf der Datenbank
Basierend auf Redis
Basierend auf zookeeper
Apropos von Redis verteilte Sperre, die meisten Leute denken an: setnx+lua
(Redis garantiert, dass beim Ausführen des Lua-Skripts keine anderen Vorgänge ausgeführt werden, um die Atomizität der Operation sicherzustellen), oder sie kennen set Schlüsselwert px Millisekunden nx
code>. Der Kernimplementierungsbefehl der letztgenannten Methode lautet wie folgt:
- 获取锁(unique_value可以是UUID等) SET resource_name unique_value NX PX 30000 - 释放锁(lua脚本中,一定要比较value,防止误解锁) if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
set key value px Millisekunden nx;<p></p>
setnx+lua
(redis保证执行lua脚本时不执行其他操作,保证操作的原子性),或者知道set key value px milliseconds nx
。后一种方式的核心实现命令如下:
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.3.2</version> </dependency>
这种实现方式有3大要点(也是面试概率非常高的地方):
set命令要用
Tatsächlich der größte Nachteil von Diese Art von Sperre funktioniert nur, wenn sie gesperrt ist. Auf einem Redis-Knoten geht die Sperre verloren, auch wenn Redis über Sentinel eine hohe Verfügbarkeit gewährleistet. Wenn der Master-Knoten aus irgendeinem Grund zwischen Master und Slave wechselt, geht die Sperre verloren:set key value px milliseconds nx
- Erhalten am die Redis-Master-Knotensperre;
- Der gesperrte Schlüssel wurde jedoch nicht mit dem Slave-Knoten synchronisiert; das Schloss geht verloren.
Um das Single-Point-of-Failure-Problem zu vermeiden, schlug der Redis-Autor Antirez eine fortschrittlichere Methode zur Implementierung verteilter Sperren vor, die auf der verteilten Umgebung basiert:TTLRedlock
Versuchen Sie, Sperren von 5 Instanzen nacheinander zu erhalten, indem Sie denselben Schlüssel und einen „einzigartigen Wert“ (z. B. UUID) verwenden. Wenn der Client eine Sperre von Redis anfordert, sollte er ein Netzwerkverbindungs- und Antwort-Timeout festlegen, das kürzer als die Ablaufzeit der Sperre sein sollte. Wenn die automatische Ablaufzeit- . Redlock ist auch die einzige Möglichkeit unter allen verteilten Sperrimplementierungen in Redis, die den Interviewer zum Höhepunkt bringen kann. Redis Advanced Distributed Lock: Redlock
- Der Redlock-Algorithmus von Antirez sieht ungefähr so aus: In der verteilten Umgebung von Redis gehen wir davon aus, dass es N Redis-Master gibt. Diese Knoten sind völlig unabhängig voneinander und es gibt keine Master-Slave-Replikation oder andere Cluster-Koordinierungsmechanismen. Wir stellen sicher, dass wir zum Erwerb und Freigeben von Sperren für N-Instanzen dieselbe Methode verwenden wie bei einer Redis-Einzelinstanz. Nun gehen wir davon aus, dass es 5 Redis-Masterknoten gibt, und wir müssen diese Redis-Instanzen auf 5 Servern ausführen, um sicherzustellen, dass nicht alle gleichzeitig ausfallen.
- Um die Sperre zu erhalten, sollte der Client die folgenden Vorgänge ausführen:
- Ermitteln Sie die aktuelle Unix-Zeit in Millisekunden.
Ihrer Sperre beispielsweise 10 Sekunden beträgt, sollte das Zeitlimit zwischen 5 und 50 Millisekunden liegen. Dadurch kann vermieden werden, dass sich das serverseitige Redis aufhängt und der Client immer noch auf das Antwortergebnis wartet. Wenn der Server nicht innerhalb der angegebenen Zeit antwortet, sollte der Client versuchen, so schnell wie möglich eine Sperre von einer anderen Redis-Instanz zu erhalten. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。 此处不讨论时钟漂移 redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。 首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似: 实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中: 获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s: 获取锁的命令中, KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY; ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s; ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId: 释放锁的代码为redLock.unlock(),核心源码如下: 下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。 自定义一个注解,被注解的方法会执行获取分布式锁的逻辑 在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下: 定义一个TestController来测试我们实现的分布式锁Redlock源码
1. Redlock依赖
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.3.2</version>
</dependency>
2. Redlock用法
Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
.setMasterName("masterName")
.setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 还可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
isLock = redLock.tryLock();
// 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
if (isLock) {
//TODO if get lock success, do something;
}
} catch (Exception e) {
} finally {
// 无论如何, 最后都要解锁
redLock.unlock();
}
3. Redlock唯一ID
protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
return id + ":" + threadId;
}
4. Redlock获取锁
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 获取锁时向5个redis实例发送的命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 获取分布式锁的KEY的失效时间毫秒数
"return redis.call('pttl', KEYS[1]);",
// 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
5. Redlock释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 向5个redis实例都执行如下命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式锁KEY不存在,那么向channel发布一条消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是当前线程占有分布式锁,那么将重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
Redis实现的分布式锁轮子
1. 自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
/**
* 业务键
*
* @return
*/
String key();
/**
* 锁的过期秒数,默认是5秒
*
* @return
*/
int expire() default 5;
/**
* 尝试加锁,最多等待时间
*
* @return
*/
long waitTime() default Long.MIN_VALUE;
/**
* 锁的超时时间单位
*
* @return
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
}
2. AOP拦截器实现
@Aspect
@Component
public class LockMethodAspect {
@Autowired
private RedisLockHelper redisLockHelper;
@Autowired
private JedisUtil jedisUtil;
private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);
@Around("@annotation(com.redis.lock.annotation.RedisLock)")
public Object around(ProceedingJoinPoint joinPoint) {
Jedis jedis = jedisUtil.getJedis();
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
RedisLock redisLock = method.getAnnotation(RedisLock.class);
String value = UUID.randomUUID().toString();
String key = redisLock.key();
try {
final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
logger.info("isLock : {}",islock);
if (!islock) {
logger.error("获取锁失败");
throw new RuntimeException("获取锁失败");
}
try {
return joinPoint.proceed();
} catch (Throwable throwable) {
throw new RuntimeException("系统异常");
}
} finally {
logger.info("释放锁");
redisLockHelper.unlock(jedis,key, value);
jedis.close();
}
}
}
3. Redis实现分布式锁核心类
@Component
public class RedisLockHelper {
private long sleepTime = 100;
/**
* 直接使用setnx + expire方式获取分布式锁
* 非原子性
*
* @param key
* @param value
* @param timeout
* @return
*/
public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
Long result = jedis.setnx(key, value);
// result = 1时,设置成功,否则设置失败
if (result == 1L) {
return jedis.expire(key, timeout) == 1L;
} else {
return false;
}
}
/**
* 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
*
* @param jedis
* @param key
* @param UniqueId
* @param seconds
* @return
*/
public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
keys.add(key);
values.add(UniqueId);
values.add(String.valueOf(seconds));
Object result = jedis.eval(lua_scripts, keys, values);
//判断是否成功
return result.equals(1L);
}
/**
* 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
*
* @param key
* @param value
* @param timeout
* @return
*/
public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
long seconds = timeUnit.toSeconds(timeout);
return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
}
/**
* 自定义获取锁的超时时间
*
* @param jedis
* @param key
* @param value
* @param timeout
* @param waitTime
* @param timeUnit
* @return
* @throws InterruptedException
*/
public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
long seconds = timeUnit.toSeconds(timeout);
while (waitTime >= 0) {
String result = jedis.set(key, value, "nx", "ex", seconds);
if ("OK".equals(result)) {
return true;
}
waitTime -= sleepTime;
Thread.sleep(sleepTime);
}
return false;
}
/**
* 错误的解锁方法—直接删除key
*
* @param key
*/
public void unlock_with_del(Jedis jedis,String key) {
jedis.del(key);
}
/**
* 使用Lua脚本进行解锁操纵,解锁的时候验证value值
*
* @param jedis
* @param key
* @param value
* @return
*/
public boolean unlock(Jedis jedis,String key,String value) {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) else return 0 end";
return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
}
}
4. Controller层控制
@RestController
public class TestController {
@RedisLock(key = "redis_lock")
@GetMapping("/index")
public String index() {
return "index";
}
}
Das obige ist der detaillierte Inhalt vonAnalyse der verteilten Redis-Sperreninstanz. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!