ホームページ >Java >&#&ベース >ついにここに...RocketMQ リテラシーの章

ついにここに...RocketMQ リテラシーの章

coldplay.xixi
coldplay.xixi転載
2020-10-20 17:16:393092ブラウズ

java Basic Tutorial コラムでは、今日の RocketMQ に関する知識を詳しく紹介します。

ついにここに...RocketMQ リテラシーの章

ブログを書くのは久しぶりです。ブログを書かない理由は数え切れないほどありますが、結局のところ、やはり「怠惰」です。今日はやっと怠け者のガンを治す薬を飲み、ブログを書くことにしました。何を紹介しようか? いろいろ考えた結果、RocketMQ を紹介することにします。結局、30 個以上のブログを書いてきましたが、まだ MQ についてまともなブログを書いたことはありません。このブログは比較的基本的なものであり、ソース コードの分析は含まれておらず、リテラシーだけが含まれています。

MQ の用途について

デカップリング

ある視点から見ると、マイクロサービスが MQ の活発な開発を促進したと思います。本来、システムには N 個の複数のモジュールがあり、すべてのモジュールは強く結合されています。マイクロサービスでは、モジュールはシステムであり、システムは間違いなく対話する必要があります。対話には 3 つの一般的な方法があります。1 つは RPC、1 つは HTTP、もう 1 つは MQ です。

非同期

当初、ビジネスは N のステップに分割されており、最終結果をユーザーに返す前に段階的に処理する必要がありましたが、MQ では最も重要な部分が実現されました。が最初に処理され、その後 MQ にメッセージを送信し、ユーザーに直接 OK を返します。以降の手順については、バックグラウンドでゆっくりと処理しましょう。これはユーザー エクスペリエンスを向上させるための成果物です。

ピーク クリッピング

特定のインターフェイスに対するリクエストの数が突然急増すると、必然的にアプリケーション サーバーとデータベース サーバーに多大な負荷がかかります。バックグラウンドでゆっくりと処理されるため、どれだけのリクエストが来るかを気にする必要はありません。

RocketMQ の概要

RocketMQ は Java で書かれており、Alibaba のオープン ソース メッセージ ミドルウェアであり、Kafka の利点の多くを吸収しています。 Kafka も人気のあるメッセージ ミドルウェアですが、Kafka は Scala で書かれているため、Java プログラマーがソース コードを読むのには適していません。また、Java プログラマーがカスタマイズされた開発を行うのにも適していません。 Kafka に触れたことのある友人は、Kafka を使いこなすのが簡単ではないことを知っています。比較的言えば、RocketMQ ははるかに単純であり、RocketMQ は Alibaba の恩恵を受けており、N Double 11 のテストを経験しています。国内のインターネット企業により適しています。なので国内でも使われており、RocketMQの会社はたくさんあります。

RocketMQ の 4 つの主要コンポーネント

ついにここに...RocketMQ リテラシーの章画像は gitee.com/mirrors/roc からのものです...

RocketMQ には 4 つの主要コンポーネントがあることがわかります。 :

NameServer

    #ステートレス サービスである登録センターはクラスターに展開できますが、NameServer ノード間でのデータのやり取りはありません。
  • Borker は、トピック ルーティング情報をすべてのネームサーバーに定期的に報告します。プロデューサーとコンシューマーは、ルーティング情報を定期的に更新するためにネームサーバー トピックをランダムに選択します。
  • トピック ルーティング情報は、NameServer クラスター内の結果整合性を採用します。
  • 保証されたAP。
Borker

    RocketMQ のサーバーは、メッセージの保存と配布に使用されます。
  • Borker は、所有するすべてのトピック ルーティング情報を NameServer に定期的に報告します。
  • ボーカーには、マスターとフォロワーの 2 つの役割があります。マスターは、読み取り (メッセージの消費) 操作と書き込み (メッセージの生成) 操作を担当します。マスターがビジーまたは応答できない場合、フォロワーは読み取り操作を引き受けることができます。 BorkerId=0 は Matser、BorkerId!=0 は Follower を意味します。 まず、これまでのところ、BorkerId=1 を持つフォロワーのみが読み取り操作を実行できます。 第 2 に、マスター ノードがハングアップした場合のフォロワーからマスターへの自動アップグレードをサポートしているのは、RocketMQ の上位バージョンのみです。
プロデューサー

プロデューサーは、定期的にネームサーバーへのトピック ルーティング情報のクエリを開始します。

コンシューマ

コンシューマは、定期的にネームサーバーへのトピックルーティング情報のクエリを開始します。

登録センターが Zookeeper を使用しない理由

実際、RocketMQ の下位バージョンでは確かに Zookeeper が登録センターとして使用されていましたが、後に現在の NameServer に変更されました。主な理由は次のとおりだと思います:

  • RocketMQ はすでにミドルウェアであるため、他のミドルウェアに依存したくありません。
  • Zookeeper は比較的重く、RocketMQ では使用できない機能が多いため、軽量な登録センターを作成することをお勧めします。
  • Zookeeper は CP です。リーダー選挙がトリガーされると、登録センターは利用できなくなります。ただし、RocketMQ の登録センターは、結果整合性が確保されている限り、強い一貫性を必要としません。

RocketMQ メッセージ ドメイン モデル

Message

  • 送信されたメッセージ。
  • メッセージにはトピックが必要です。
  • メッセージには複数のタグと複数のキーを含めることができ、これらはメッセージの追加属性とみなすことができます。

トピック

  • 1 種類のメッセージのコレクション。
  • 各メッセージにはトピックが必要です。
  • 第 1 レベルのメッセージのタイプ。

タグ

  • トピックに加えて、メッセージにはタグを付けることもできます。タグは、同じトピックの下でさまざまな種類のメッセージを細分化するために使用されます。
  • タグは必要ありません。
  • 第 2 レベルのメッセージのタイプ。

Group

ProducerGroup と ConsumerGroup に分かれていますが、ConsumerGroup に注目します。ConsumerGroup には複数の Consumer が含まれます。

クラスター消費モードでは、ConsumerGroup の下のコンシューマが一緒にトピックを消費し、各コンシューマは N 個のキューに割り当てられますが、キューは 1 つのコンシューマによってのみ消費されます。異なる ConsumerGroup は同じ A トピックを消費でき、メッセージは、このトピックにサブスクライブされているすべての ConsumerGroups によって消費されます。

キュー

  • トピックには、デフォルトで 4 つのキューが含まれています。
  • クラスター消費モードでは、同じ ConsumerGroup 内の Consumer は複数の Queue からのメッセージを消費できますが、1 つの Queue は 1 つの Consumer によってのみ消費されます。
  • キュー内のメッセージは順序付けされます。
  • 読み取りキューと書き込みキューに分かれており、一般に読み取りキューと書き込みキューの数が一致していないと問題が発生しやすくなります。

消費モード

クラスタリング (クラスター消費) とブロードキャスト (ブロードキャスト消費) の 2 つの消費モードがあります。

他の MQ とは異なり、他の MQ はメッセージ送信時にクラスター消費またはブロードキャスト消費を指定しますが、RocketMQ はクラスター消費またはブロードキャスト消費をコンシューマ側で設定します。

クラスタリング (クラスター消費)

デフォルトはクラスター消費モードです。このモードでは、ConsumerGroup のすべての Consumer が共同でトピックからのメッセージを消費し、各 Consumer は N からのメッセージを消費する責任があります。 (N は 1、またはキューに割り当てられていない 0 の場合もあります)、キューは 1 つのコンシューマによってのみ消費されます。 Consumer が死亡した場合、ConsumerGroup の下にある他の Consumer が引き継ぎ、消費を続けます。

クラスター消費モードでは、消費の進行状況はボーカー側で維持され、ストレージ パスは ${ROCKET_HOME}/store/config/consumerOffset.json です (次に示すように)。図: ついにここに...RocketMQ リテラシーの章UsetopicName@consumerGroupName は Key、消費の進行状況は Value です。Value の形式は queueId:offset で、複数の ConsumerGroup がある場合、消費の進行状況が各 ConsumerGroup の進行状況は異なるため、分離する必要があります。

ブロードキャスト (ブロードキャスト消費)

ブロードキャスト消費メッセージは、ConsumerGroup 内のすべての Consumer に送信されます。

ブロードキャスト消費モードでは、消費の進行状況はコンシューマ側で維持されます。

消費キューのロード アルゴリズムとリバランス メカニズム

消費キューのロード アルゴリズム

クラスター消費モードでは、ConsumerGroup の下のすべての Consumer が共同で Topic メッセージを消費することがわかっています。 N 個のキューからのメッセージを消費する責任がありますが、どのように割り当てられるのでしょうか?これには、消費キューのロード アルゴリズムが関係します。

RocketMQ は多数のコンシューマー キュー ロード アルゴリズムを提供します。その中で最もよく使用される 2 つのアルゴリズムは、AllocateMessageQueueAveragely と AllocateMessageQueueAveragelyByCircle です。これら 2 つのアルゴリズムの違いを見てみましょう。

トピックには、q0 ~ q15 で表される 16 個のキューと、c0 ~ c2 で表される 3 つのコンシューマがあるとします。

AllocateMessageQueueAveragely を使用してキュー負荷アルゴリズムを使用した結果は次のとおりです。

  • c0:q0 q1 q2 q3 q4 q5
  • c1:q6 q7 q8 q9 q10
  • c2:q11 q12 q13 q14 q15

AllocateMessageQueueAveragelyByCircle を使用してキュー負荷アルゴリズムを使用した結果は次のとおりです。

  • c0:q0 q3 q6 q9 q12 q15
  • c1: q1 q4 q7 q10 q13
  • c2: q2 q5 q8 q11 q14

ConsumerGroup 下のすべての Consumer が一緒にトピック メッセージを消費します、各コンシューマは N 個のキュー メッセージを消費する責任がありますが、1 つのキューを N 個のコンシューマが同時に消費することはできません。これは何を意味しますか?

賢明な方なら、トピックにキューが 4 つとコンシューマが 5 つしかない場合、1 つのコンシューマはどのキューにも割り当てられないと考えたはずです。そのため、RocketMQ では、トピックの下のキューの数は次のようになります。コンシューマの最大数を直接決定します。つまり、コンシューマを追加するだけでは消費速度を上げることはできません。

リバランス

トピックを作成する際にはキューの数を十分に考慮することが推奨されますが、実際の状況は満足できない場合が多く、キューの数が変わらなくてもコンシューマの数は変化します。コンシューマがオンラインまたはオフラインになったとき、コンシューマが電話を切ったとき、または新しいコンシューマが追加されたときなどに、変化が発生します。キューの拡大と縮小、およびコンシューマの拡大と縮小により、リバランスが行われます。つまり、消費キューがコンシューマに再分配されます。

RocketMQ では、コンシューマーはトピック キューの数を定期的にクエリします。コンシューマーの数が変化すると、リバランスがトリガーされます。

リバランスは RocketMQ によって内部的に実装されるため、プログラマは気にする必要はありません。

引くか押すか?

一般的に、MQ にはメッセージを取得する 2 つの方法があります:

  • プル: コンシューマが積極的にメッセージをプルします。利点は、コンシューマがプルするメッセージの頻度と数を制御できることです。自身の消費容量が大きいため、Consumer 側でメッセージの蓄積を引き起こすのは簡単ではありませんが、リアルタイム性はあまり良くなく、効率は比較的低くなります。
  • Push: Broker は積極的にメッセージを送信するため、リアルタイム性と効率性が高いという利点があります。しかし、Broker は Consumer の消費容量を知ることができません。Consumer にあまりにも多くのメッセージが送信されると、蓄積が発生します。 Consumer 側のメッセージの数; if send to Consumer のデータが少なすぎる場合、Consumer 側はアイドル状態になります。

プルでもプッシュでも、コンシューマは常にブローカーと対話します。対話方法には通常、短い接続、長い接続、ポーリングが含まれます。

RocketMQ はプルとプッシュの両方をサポートしているように見えますが、実際にはプッシュもプルを使用して実装されています。

これは RocketMQ の設計の独創的な部分で、短い接続でも長い接続でもポーリングでもなく、長いポーリングです。

ロングポーリング

コンシューマはメッセージをプルするリクエストを開始します。これは 2 つの状況に分けられます:

  • メッセージがあります: コンシューマがメッセージを取得した後、接続が切断されました。
  • メッセージなし: ボーカーは一定時間接続を保持します。5 秒ごとにメッセージがあるかどうかを確認します。メッセージがある場合は、コンシューマーと接続に送信されます。切断されています。

トランザクション メッセージ

RocketMQ はトランザクション メッセージをサポートします。プロデューサーがトランザクション メッセージをブローカーに送信した後、ブローカーはシステム トピック: RMQ_SYS_TRANS_HALF_TOPIC## にメッセージを保存します。 #、Consumer This メッセージを消費できないようにします。

ブローカーはスケジュールされたタスクを持ち、

RMQ_SYS_TRANS_HALF_TOPIC メッセージを消費し、プロデューサーへのレビューを開始します。レビューには、送信済み、ロールバック、不明の 3 つのステータスがあります。

    レビューのステータスが送信またはロールバックの場合、メッセージの送信とロールバックがトリガーされます。
  • #不明な場合は、次のレビューを待ちます。 RocketMQ では、メッセージのチェックバック間隔とチェックバック回数を設定でき、チェックバック回数が一定の回数を超えると、メッセージは自動的にロールバックされます。
遅延メッセージ

遅延メッセージとは、メッセージがブローカーに送信された後、コンシューマーがすぐに消費できないことを意味します。 RocketMQ は特定の遅延のみをサポートします。時間:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消費形式

RocketMQ は、同時消費と順次消費という 2 つの消費形式をサポートします。 順次に消費される場合は、並べ替えられたメッセージが同じキュー内にあることを確認する必要があります。送信するキューを選択する方法 RocketMQ にはメッセージ送信用のオーバーロードがいくつかあり、そのうちの 1 つはキューの選択をサポートしています。

同期ディスク フラッシュ、非同期ディスク フラッシュ

プロデューサーは Borker にメッセージを送信します。Borker はメッセージを永続化する必要があります。RocketMQ は 2 つの永続化戦略をサポートしています:

    同期フラッシュ: Borker は、Producer に ACK を返す前にメッセージを永続化するため、メッセージの信頼性が高いという利点がありますが、効率は遅くなります。
  • 非同期フラッシュ: ブローカーはメッセージを PageCache に書き込み、プロデューサーに ACK を返します。利点は非常に効率的であることですが、サーバーがハングアップするとメッセージが失われる可能性がありますが、RocketMQ サービスのみがハングアップする場合はメッセージは失われません。
同期レプリケーション、非同期レプリケーション

MQ の信頼性と可用性を確保するために、運用環境では通常、フォロワー ノードがデプロイされ、フォロワー ノードはマスターのノードをコピーします。 RocketMQ は 2 つのタイプをサポートします 永続的なレプリケーション戦略:

    同期レプリケーション: マスターとフォロワーの両方が、プロデューサーに ACK を返す前にメッセージを正常に書き込みます 信頼性は高くなりますが、効率は遅くなります。
  • 非同期レプリケーション: マスターが正常に書き込みを行う限り、ACK がプロデューサーに返されるため効率的ですが、メッセージが失われる可能性があります。
「書き込み」が PageCache に書き込まれるか、ハードディスクに書き込まれるかは、Follower Broker の設定によって異なります。

プロデューサーについて話しましょう

RocketMQ は、メッセージを送信する 3 つの方法を提供します。
  • oneway: ファイアアンドフォーゲット、一方向メッセージ、つまりメッセージが送信された後は無視されます。このメソッドには戻り値がありません。
  • 同期: メッセージが送信された後、同期的にボーカーの応答を待ちます。
  • 非同期: メッセージ送信後すぐに戻り、Boker の応答を受信した後、関数呼び出しメソッドが実行されます。

実際の開発では同期方式が一般的ですが、RocketMQ のパフォーマンスを向上させたい場合は、Borker 側のパラメータ、特にディスク ブラッシング戦略とレプリケーション戦略を変更することが一般的です。

メッセージ送信の再試行

メッセージ送信時に、MessageQueueSelector を使用すると、メッセージ送信の再試行機構が無効になります。

メッセージ送信に対する応答は、次の 4 つになる可能性があります:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}复制代码

最初の状況を除き、他の状況は問題があります。メッセージが失われないようにするには、次のことが必要です。プロデューサー パラメーターを設定するには: RetryAnotherBrokerWhenNotStoreOK が true です。

障害回避メカニズム

メッセージの送信に失敗した場合でも、再試行時にメッセージがボーカーに送信されるため、送信が失敗する可能性が高くなります。 RockteMQ の特徴は再試行時間であり、自動的にこの Borker を回避し、他の Borker を選択しますが、これまでのところ、非同期送信はそれほどスマートではなく、1 つの Borker でのみ再試行されるため、同期送信方法を選択することを強くお勧めします。

RocketMQ は 2 つの障害回避メカニズムを提供します。パラメータ SendLatencyFaultEnable を使用して制御します。

  • false: デフォルト値。障害回避メカニズムは再試行時にのみ有効になります。たとえば、BorkerA へのメッセージの送信が失敗した場合、再試行時には BorkerB が選択されますが、メッセージは次に送信されます。 、それでも BorkerA に送信することを選択します。
  • true: 遅延バックオフ メカニズムをオンにします。BorkerA へのメッセージの送信に失敗すると、BorkerA は一定期間利用できなくなると悲観的に判断し、それ以上のメッセージは BorkerA に送信されなくなります。将来のある期間。

遅延バックオフ メカニズムは非常に便利であるように見えますが、一般的に言えば、Borker 側がビジー状態であるため、Borker が利用できなくなったり、ネットワークが利用できなくなったりします。遅延バックオフのメカニズムがオンになっている場合、当初利用可能だったボーカーが一定期間回避され、他のボーカーが忙しくなり、状況がさらに悪化する可能性があります。

Consumer について話しましょう

Consumer スレッドの考慮事項

Consumer には、消費できる並列度を表す 2 つのパラメーターがあります。つまり、ConsumeThreadMin、## です。 # ConsumeThreadMax、コンシューマ側に蓄積されたメッセージが比較的少ない場合、コンシューマ スレッドの数は ConsumeThreadMin であるようです。コンシューマ側に蓄積されたメッセージがさらに多い場合、新しいスレッドが作成されます。コンシューマ スレッドの数が ConsumeThreadMax になるまで、消費のために自動的に開かれます。 Consumer は内部でスレッド プールを保持し、無制限のキューを使用します。つまり、ConsumeThreadMax パラメータは無効であるため、実際の開発では、ConsumeThreadMin, ConsumeThreadMax は、多くの場合、同じ値に設定されます。

ConsumeFromWhere

消費の進行状況をクエリできない場合、コンシューマはどこから消費を開始しますか? RocketMQ は、最新のメッセージ、最も古いメッセージ、指定されたタイムスタンプの 3 つの方法で消費をサポートします。

消費メッセージの再試行

RocketMQ は、ConsumerGroup ごとに

%RETRY% ConsumerGroup というトピック名で再試行キューを設定し、送信する必要がある再試行キューを保存します。 ConsumerGroup に送信されます。再試行メッセージですが、再試行には一定の遅延時間が必要です。RocketMQ は、最初にトピック名 SCHEDULE_TOPIC_XXXX で遅延キューに再試行メッセージを保存することによって再試行メッセージを処理し、次にバックグラウンドでスケジュールされたタスクが以下に従って遅延されます。 %RETRY% ConsumerGroup の再試行キューに再保存します。

メッセージが蓄積し、消費容量が不十分な場合はどうすればよいですか?

    これは、消費の進行状況を改善するための最良の方法です。
  • キューを追加し、コンシューマを追加します。
  • 元のコンシューマは、ブリック ムーバーとして、特定のルールに従ってメッセージを複数の新しいトピックに「移動」し、複数の ConsumerGroup を開いてさまざまなトピックを消費します。
  • 消費のために新しい ConsumerGroup を開きます。つまり、2 つの ConsumerGroup が同時に Topic を消費しますが、オフセットの判断に注意する必要があります。たとえば、ConsumerGroup は奇数のメッセージを消費します。 ConsumerGroup は偶数のメッセージを消費します。
もともと読み書きテキストならスラスラ書けるだろうと思っていましたが、考えすぎました 読み書きテキストなので、あまり交流のない友人向けですRocketMQ を使ってみたけど、RocketMQ ってそんなにいいの? 簡単ですよ、あまり RocketMQ に触れたことのない友達にブログを使ってスムーズに始めてもらうのは無理ですから、ブログを書きながら、これって大事なことなのかな、と思ったんです。この記事は基本的にさまざまな概念を紹介しており、API レベルについてはほとんど関与していないことがわかります。二週間では終わらないということ。

終了

関連する無料学習の推奨事項: Java 基本チュートリアル

以上がついにここに...RocketMQ リテラシーの章の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.imで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。