ホームページ  >  記事  >  Java  >  redisson によって実装された分散ロックの原理の詳細な説明

redisson によって実装された分散ロックの原理の詳細な説明

黄舟
黄舟オリジナル
2017-03-07 10:26:382797ブラウズ

この記事では、redisson による分散ロック実装の原理を詳しく紹介します。非常に優れた参考値です。以下のエディターで見てみましょう

Redisson 分散ロック

先ほどのアノテーションベースのロックには、基本的な Redis 分散ロックであるロックが実装されています。この記事では、コンポーネントによって提供される redisson RLock に基づいて、redisson がロックを実装する方法について説明します。

バージョンが異なれば、ロック機構も異なります。

は、最近リリースされた redisson のバージョン 3.2.3 を指します。初期のバージョンでは、単純な setnx、getset が使用されるようです。通常のコマンドが実行されるまで待ちます。その後、redis がスクリプト Lua をサポートしたため、実装原則が変更されました。

<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.2.3</version>
</dependency>

setnx は、デッドロックの問題を回避するために getset とトランザクションで完了する必要があります。新しいバージョンでは、トランザクションの使用や複数の Redis コマンドの操作を回避できる Lua スクリプトがサポートされており、セマンティック表現がより明確になっています。 。

RLockインターフェースの特徴

標準インターフェースLockを継承

ロック、ロック解除、トライロックなど、標準ロックインターフェースのすべての機能を備えています。

標準インターフェイス Lock を拡張しました

多くのメソッドを拡張しました。最も一般的に使用されるものは、強制ロック解放、有効期間付きロック、および一連の非同期メソッドです。最初の 2 つの方法は主に、標準のロックによって発生する可能性のあるデッドロックの問題を解決するためのものです。たとえば、スレッドがロックを取得した後、そのスレッドが存在するマシンがクラッシュします。このとき、ロックを取得したスレッドはロックを正常に解放できず、ロックを待っている残りのスレッドが待機することになります。

再入可能メカニズム

各バージョンの実装には違いがあります。再入可能性の主な考慮事項は、ロックを解放する前に同じスレッドが再度ロック リソースを適用する場合、そのプロセスを通過する必要はありません。アプリケーション プロセスは、ロック リソースを取得するだけで済みます。ロックは、jdk の ReentrantLock 関数と同様に、再エントリ数を返して記録し続けることができます。再エントリ数は、hincrby コマンドと組み合わせて使用​​されます。詳細なパラメーターは次のコードにあります。

同じスレッドかどうかを確認するにはどうすればよいですか?

redissonの解決策は、RedissonLockインスタンスのguidと現在のスレッドのIDを追加し、getLockNameを通してそれを返すことです

public class RedissonLock extends RedissonExpirable implements RLock {
 final UUID id;
 protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
  super(commandExecutor, name);
  this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L);
  this.commandExecutor = commandExecutor;
  this.id = id;
 }
 String getLockName(long threadId) {
  return this.id + ":" + threadId;
 }

RLockがロックを取得するための2つのシナリオ

ここから入手してください tryLock のソース コードを見ると、tryAcquire メソッドはロックに適用され、ロック有効期間の残り時間を返します。それが空の場合、ロックが直接取得されておらず、返されていないことを意味します。他のスレッドで時間を取得すると、待ちの競争ロジックに入ります。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  Long ttl = this.tryAcquire(leaseTime, unit);
  if(ttl == null) {
   //直接获取到锁
   return true;
  } else {
   //有竞争的后续看
  }
 }

競合なし、ロックを直接取得します

まず、ロックの取得とロックの解放の背後で Redis が何をしているかを見てみましょう。 Redis モニターを使用して、Redis の実行を監視できます。背景。 @RequestLockable をメソッドに追加すると、実際に lock と lock が呼び出されます。 以下は redis コマンドです:

lock

redis の上位バージョンは lua スクリプトをサポートするため、redisson もそれをサポートします。 Lua スクリプトに詳しくない場合は、調べてください。 lua コマンドを実行するロジックは次のとおりです。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, 
    "if (redis.call(\&#39;exists\&#39;, KEYS[1]) == 0) then redis.call(\&#39;hset\&#39;, KEYS[1], ARGV[2], 1); 
    redis.call(\&#39;pexpire\&#39;, KEYS[1], ARGV[1]); return nil; end; 
    if (redis.call(\&#39;hexists\&#39;, KEYS[1], ARGV[2]) == 1) then redis.call(\&#39;hincrby\&#39;, KEYS[1], ARGV[2], 1); 
    redis.call(\&#39;pexpire\&#39;, KEYS[1], ARGV[1]); return nil; end; 
    return redis.call(\&#39;pttl\&#39;, KEYS[1]);", 
    Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)});
  }

ロック プロセス:

  1. ロック キーが存在するかどうかを確認し、存在しない場合は、hset を直接呼び出して現在のスレッド情報を保存します。有効期限を設定し、クライアントにロックを直接取得するように指示します。

  2. ロックキーが存在するかどうかを確認し、存在する場合は、再エントリの回数を 1 つ増やし、有効期限をリセットし、nil を返し、クライアントにロックを直接取得するように指示します。

  3. は他のスレッドによってロックされており、ロック有効期間の残り時間を返し、クライアントに待機する必要があることを伝えます。

"EVAL" 
"if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then 
redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1); 
redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); 
return nil; end;
if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then 
redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); 
redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); 
return nil; end;
return redis.call(&#39;pttl&#39;, KEYS[1]);"
 "1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 
 "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"

上記のLuaスクリプトは、実際のredisコマンドに変換されます。以下は、Luaスクリプトの操作後に実行される実際のredisコマンドです。

1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 
"346e1eb8-5bfd-4d49-9870-042df402f248:21" "1"
1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"

ロック解除

ロック解除のプロセスは少し複雑に見えます:

  1. ロックキーが存在しない場合は、ロックが利用可能であることを示すメッセージを送信します

  2. ロックキーが存在しない場合 現在のスレッドがロックされている場合はnilが返されます

  3. リエントランシーをサポートしているため、ロック解除時にリエントリー回数を1減らす必要があります

  4. 計算されたリエントリー回数の場合> 0、有効期限はリセットされます

  5. 計算された再エントリ数

"EVAL" 
"if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then
 redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]);
 return 1; end;
if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[3]) == 0) then 
return nil;end; 
local counter = redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[3], -1); 
if (counter > 0) then redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[2]); return 0; 
else redis.call(&#39;del&#39;, KEYS[1]); redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); return 1; end; 
return nil;"
"2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 
"redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}"
 "0" "1000"
 "346e1eb8-5bfd-4d49-9870-042df402f248:21"

競合せずに redis コマンドのロックを解除します:

主にロック解除メッセージを送信してウェイクアップします。待機キュー内のスレッドが再度ロックを獲得するために競合します。

りー

有竞争,等待

有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令,复杂的在redisson的源码上。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中。

  • this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败

  • this.await返回true,进入循环尝试获取锁。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    final long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(leaseTime, unit);
    if(ttl == null) {
      return true;
    } else {
      //重点是这段
      time -= System.currentTimeMillis() - current;
      if(time <= 0L) {
        return false;
      } else {
        current = System.currentTimeMillis();
        final RFuture subscribeFuture = this.subscribe(threadId);
        if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
          if(!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener() {
              public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                if(subscribeFuture.isSuccess()) {
                  RedissonLock.this.unsubscribe(subscribeFuture, threadId);
                }
              }
            });
          }
          return false;
        } else {
          boolean var16;
          try {
            time -= System.currentTimeMillis() - current;
            if(time <= 0L) {
              boolean currentTime1 = false;
              return currentTime1;
            }
            do {
              long currentTime = System.currentTimeMillis();
              ttl = this.tryAcquire(leaseTime, unit);
              if(ttl == null) {
                var16 = true;
                return var16;
              }
              time -= System.currentTimeMillis() - currentTime;
              if(time <= 0L) {
                var16 = false;
                return var16;
              }
              currentTime = System.currentTimeMillis();
              if(ttl.longValue() >= 0L && ttl.longValue() < time) {
                this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS);
              } else {
                this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
              }
              time -= System.currentTimeMillis() - currentTime;
            } while(time > 0L);
            var16 = false;
          } finally {
            this.unsubscribe(subscribeFuture, threadId);
          }
          return var16;
        }
      }
    }
  }

循环尝试一般有如下几种方法:

  • while循环,一次接着一次的尝试,这个方法的缺点是会造成大量无效的锁申请。

  • Thread.sleep,在上面的while方案中增加睡眠时间以降低锁申请次数,缺点是这个睡眠的时间设置比较难控制。

  • 基于信息量,当锁被其它资源占用时,当前线程订阅锁的释放事件,一旦锁释放会发消息通知待等待的锁进行竞争,有效的解决了无效的锁申请情况。核心逻辑是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一个信号量,有兴趣可以再研究研究。

redisson依赖

由于redisson不光是针对锁,提供了很多客户端操作redis的方法,所以会依赖一些其它的框架,比如netty,如果只是简单的使用锁也可以自己去实现。

 以上就是redisson实现分布式锁原理详解的内容,更多相关内容请关注PHP中文网(www.php.cn)!


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。