ホームページ >データベース >Redis >Redis でメッセージ キューを使用する方法の簡単な分析

Redis でメッセージ キューを使用する方法の簡単な分析

青灯夜游
青灯夜游転載
2022-01-05 09:57:352757ブラウズ

この記事では、Redis の高度な使用法 (メッセージ キュー) を説明し、Redis の遅延キューについて紹介します。

Redis でメッセージ キューを使用する方法の簡単な分析

メッセージ キュー ミドルウェアというと、アプリケーションに非同期メッセージング機能を実装するための RabbitMQ、RocketMQ、Kafka を誰もが思い浮かべます。これらは、私たちが理解できる以上の機能を備えた特殊なメッセージ キュー ミドルウェアです。

これらのメッセージ ミドルウェアの使用は、RabbitMQ など複雑です。メッセージを送信する前に、Exchange と Queue を作成し、いくつかのルールに従って Exchange と Queue をバインドする必要があります。ルーティング キーと制御ヘッダー メッセージを定式化する必要もあります。これはプロデューサーのみが対象ですが、コンシューマーもメッセージを消費する前に上記の一連の面倒な手順を再度実行する必要があります。

したがって、100% の信頼性を必要とせず、単純なメッセージ キュー要件を実装したい人は、Redis を使用してメッセージ キュー ミドルウェアの面倒な手順から解放できます。

Redis のメッセージ キューは専門的なメッセージ キューではないため、メッセージ キューに多くの高度な機能はなく、ACK 保証もありません。メッセージの信頼性を究極的に追求する場合は、プロフェッショナルな MQ ミドルウェアをご利用ください。 [関連する推奨事項: Redis ビデオ チュートリアル ]

非同期メッセージ キュー

最も単純な非同期メッセージ キューから始まり、Redis リストのデータ構造は一般的に次のようになります。 used 非同期メッセージ キューとして、エンキューには lrpush/lpush が使用され、デキューには rpop/lpop が使用されます。

Redis でメッセージ キューを使用する方法の簡単な分析

#質問 1: 空のキュー

pop 操作の場合、メッセージ キューが空の場合、クライアントは無限に陥ります。ポップのループにより、人命を無駄にするアイドル ポーリングが大量に発生し、クライアントの CPU が増加し、同時に Redis の QPS も増加します。

上記の問題の解決策は、リスト構造の blpop/brpop を使用してデキューすることです。ここで、接頭辞 b はブロック、読み取りのブロックを表します。ブロッキング読み取りの場合、キューにデータがない場合はスリープ状態に入り、データが到着するとすぐにウェイクアップします。上記の問題を完全に解決します。

問題 2: アイドル接続が切断される

ブロッキング読み取りソリューションは完璧に見えますが、アイドル接続という別の問題が発生します。スレッドがどこかでブロックし続けると、Redis クライアント接続はアイドル接続になります。アイドル時間が長すぎる場合、Redis サーバーはアイドル リソースの占有を減らすために積極的に切断します。このとき、blpop/brpop は例外をスローします。

したがって、クライアント (アプリケーション) コンシューマーを作成するときは注意し、例外のキャッチに注意して再試行する必要があります。

アプリケーション 1: 遅延キュー

Redis 分散ロックでは、通常、ロックの失敗に対処する 3 つの戦略があります。

  • 例外を直接スローすると、フロントエンドが操作を続行するかどうかをユーザーに通知します;

  • しばらくスリープして再試行してください;

  • リクエストを遅延キューに入れて、しばらくしてから再試行します;

Redis の遅延キューについては、zset (順序付きリスト) データ構造を通じて実装できます。 。 zse の値としてメッセージを文字列としてシリアル化し、メッセージの有効期限処理時間 (遅延時間) をスコアとしてシリアル化します。次に、zset をポーリングして処理の有効期限を取得し、正常に消費されたことを示すために zset から zrem を通じてキーを削除して、タスクを処理します。

コア コードは次のとおりです:

// 生产\
public void delay(T msg) {\
  TaskItem task = new TaskItem();\
  task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\
  task.msg = msg;\
  String s = JSON.toJSONString(task); // fastjson 序列化\
  jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\
}\
// 消费\
public void loop() {\
  while (!Thread.interrupted()) {\
   // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\
   Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\
   if (values.isEmpty()) {\
     try {\
       Thread.sleep(500); // 歇会继续\
    }\
     catch (InterruptedException e) {\
       break;\
    }\
     continue;\
  }\
   String s = values.iterator().next();  //消费队列\
   if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\
     TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\
     this.handleMsg(task.msg);\
  }\
}\
}

上記のコードは、同じタスクが複数のスレッドによって競合される状況のためにマルチスレッドで使用されます。ただし、タスクの処理を回避できます。タスク処理後のzremによる複数の消費状況。しかし、タスクを取得しても正常に消費できなかったスレッドにとって、タスクを取得するのは時間の無駄でした。したがって、lua スクリプトを使用してこのロジックを最適化することを検討できます。アトミック操作のために zrangeByScore と zrem を一緒にサーバーに移動すると、問題は完全に解決されます。

プログラミング関連の知識について詳しくは、

プログラミング入門をご覧ください。 !

以上がRedis でメッセージ キューを使用する方法の簡単な分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.cnで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。