本文將詳細介紹redisson實現分散式鎖定原理。具有很好的參考價值,下面跟著小編一起來看下吧
Redisson分散式鎖
之前的基於註解的鎖有一種鎖是基本redis的分散式鎖,鎖的實現我是基於redisson元件提供的RLock,這篇來看看redisson是如何實現鎖的。
不同版本實現鎖的機制並不相同
引用的redisson最近發布的版本3.2.3,不同的版本可能實現鎖的機制並不相同,早期版本好像是採用簡單的setnx,getset等常規命令來配置完成,而後期由於redis支援了腳本Lua變更了實現原理。
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.2.3</version> </dependency>
setnx需要配合getset以及事務來完成,這樣才能比較好的避免死鎖問題,而新版本由於支援lua腳本,可以避免使用事務以及操作多個redis命令,語義表達更加清晰一些。
RLock介面的特點
繼承標準介面Lock
擁有標準鎖定介面的所有特性,例如lock,unlock,trylock等等。
擴展標準接口Lock
擴展了很多方法,常用的主要有:強制鎖釋放,帶有效期的鎖,還有一組異步的方法。其中前面兩個方法主要是解決標準lock可能造成的死鎖問題。例如某個執行緒取得到鎖之後,執行緒所在機器死機,此時取得了鎖的執行緒無法正常釋放鎖導致其餘的等待鎖的執行緒一直等待下去。
可重入機制
各版本實現有差異,可重入主要考慮的是性能,同一線程在未釋放鎖時如果再次申請鎖資源不需要走申請流程,只需要將已經獲取的鎖繼續回傳並且記錄上已經重入的次數即可,與jdk裡面的ReentrantLock功能類似。重入次數靠hincrby指令來配合使用,詳細的參數下面的程式碼。
怎麼判斷是同一線程?
redisson的方案是,RedissonLock實例的一個guid再加當前執行緒的id,透過getLockName回傳
tryLock的原始碼來看:tryAcquire方法是申請鎖並返回鎖有效期還剩餘的時間,如果為空說明鎖未被其它線程申請直接獲取並返回,如果獲取到時間,則進入等待競爭邏輯。
public class RedissonLock extends RedissonExpirable implements RLock { final UUID id; protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name); this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L); this.commandExecutor = commandExecutor; this.id = id; } String getLockName(long threadId) { return this.id + ":" + threadId; }
無競爭,直接獲取鎖定
先看下首先獲取鎖定並釋放鎖背後的redis都在做什麼,可以利用redis的monitor來執行後台監控。當我們在方法了增加@RequestLockable之後,其實就是調用lock以及unlock,下面是redis命令:
加鎖由於高版本的redis支持lua腳本,所以redisson也對其進行了支持
由於高版本的redis支持lua腳本,所以redisson也對其進行了支持,採用了腳本模式,不熟悉lua腳本的可以去找下。執行lua指令的邏輯如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit);
if(ttl == null) {
//直接获取到锁
return true;
} else {
//有竞争的后续看
}
}
加鎖的流程:
告訴客戶端直接取得到鎖。 判斷lock鍵是否存在,存在則將重入次數加1,並重新設定過期時間,返回nil,告訴客戶端直接取得到鎖定。
被其它執行緒已經鎖定,返回鎖有效期的剩餘時間,告訴客戶端需要等待。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)}); }
上面的lua腳本會轉換成真正的redis命令,下面的是經過lua腳本運算之後實際執行的redis命令。
"EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"
解鎖
解鎖的流程,如果看起來複雜不是被當前執行緒鎖定,則返回nil
由於支援可重入,在解鎖時將重入次數需要減1如果計算後的重入次數>0,則重新設定過期時間
1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "346e1eb8-5bfd-4d49-9870-042df402f248:21" "1" 1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"
主要是發送一個解鎖的訊息,以此喚醒等待隊列中的執行緒重新競爭鎖。
"EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end; if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0" "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"
有竞争,等待
有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令,复杂的在redisson的源码上。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中。
this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
this.await返回true,进入循环尝试获取锁。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { return true; } else { //重点是这段 time -= System.currentTimeMillis() - current; if(time <= 0L) { return false; } else { current = System.currentTimeMillis(); final RFuture subscribeFuture = this.subscribe(threadId); if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if(!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener() { public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if(subscribeFuture.isSuccess()) { RedissonLock.this.unsubscribe(subscribeFuture, threadId); } } }); } return false; } else { boolean var16; try { time -= System.currentTimeMillis() - current; if(time <= 0L) { boolean currentTime1 = false; return currentTime1; } do { long currentTime = System.currentTimeMillis(); ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { var16 = true; return var16; } time -= System.currentTimeMillis() - currentTime; if(time <= 0L) { var16 = false; return var16; } currentTime = System.currentTimeMillis(); if(ttl.longValue() >= 0L && ttl.longValue() < time) { this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS); } else { this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while(time > 0L); var16 = false; } finally { this.unsubscribe(subscribeFuture, threadId); } return var16; } } } }
循环尝试一般有如下几种方法:
while循环,一次接着一次的尝试,这个方法的缺点是会造成大量无效的锁申请。
Thread.sleep,在上面的while方案中增加睡眠时间以降低锁申请次数,缺点是这个睡眠的时间设置比较难控制。
基于信息量,当锁被其它资源占用时,当前线程订阅锁的释放事件,一旦锁释放会发消息通知待等待的锁进行竞争,有效的解决了无效的锁申请情况。核心逻辑是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一个信号量,有兴趣可以再研究研究。
redisson依赖
由于redisson不光是针对锁,提供了很多客户端操作redis的方法,所以会依赖一些其它的框架,比如netty,如果只是简单的使用锁也可以自己去实现。
以上就是redisson实现分布式锁原理详解的内容,更多相关内容请关注PHP中文网(www.php.cn)!