検索
ホームページテクノロジー周辺機器AIAlibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?

皆さんこんにちは、ジュン兄です。

最近、インタビューに参加したときに読者から質問がありました。コンシューマが 100 個などのメッセージのバッチをプルした場合、100 番目のメッセージの消費は成功しますが、50 番目のメッセージの消費は失敗します。 . オフセット どのように更新されますか?この問題に関して、今日はメッセージのバッチが消費されなかった場合にオフセットを保存する方法について話しましょう。

1 プル メッセージ

1.1 プル リクエストのカプセル化

RocketMQ プッシュ モードを例にとると、RocketMQ コンシューマのスタートアップ コードは次のとおりです。上記の DefaultMQPushConsumer はプッシュ モードのコンシューマーであり、起動メソッドは start です。コンシューマーが開始されると、リバランス スレッド (RebalanceService) がトリガーされます。このスレッドのタスクは、無限ループで継続的にリバランスを行い、最後にメッセージを pullRequestQueue にプルするリクエストをカプセル化することです。このプロセスに関係する UML クラス図は次のとおりです。

Alibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?1.2 プル リクエストの処理

プル リクエスト PullRequest をカプセル化した後、RocketMQ はメッセージを継続的に取得しません。処理のために pullRequestQueue からリクエストをプルします。 UML クラス図は次のとおりです。

Alibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?メッセージをプルするためのエントリ メソッドは無限ループであり、コードは次のとおりです。ここにメッセージがある場合、それを PullCallback に送信すると、このコールバック関数によって処理されます。

プルされたメッセージは、まず ProcessQueue の msgTreeMap に置かれ、次に処理のためにスレッド クラス ConsumeRequest にカプセル化されます。コードを合理化した後の ConsumeRequest 処理ロジックは次のようになります。

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}

2 消費結果の処理

2.1 同時メッセージ

同時メッセージ処理の消費結果のコードは次のように簡略化されます。次のとおりです:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}

上記のコードからわかるように、メッセージ処理のロジックがシリアルである場合、たとえば記事の冒頭のコードでは for ループを使用してメッセージを処理します。特定のメッセージの処理が失敗した場合は、ループを直接終了し、ConsumeConcurrentlyContext の ackIndex 変数にメッセージ リスト内の失敗したメッセージの位置が割り当てられるため、この失敗したメッセージに続くメッセージは処理されなくなり、ブローカーに送信されます。再引っ張りを待ちます。コードは次のとおりです。

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}

正常に消費されたメッセージは ProcessQueue の msgTreeMap から削除され、msgTreeMap の最小オフセット (firstKey) が更新のために返されます。注: クラスター モード オフセットはブローカー側に保存されます。オフセットを更新するには、メッセージをブローカーに送信する必要があります。ただし、ブロードキャスト モード オフセットはコンシューマー側に保存され、ローカル オフセットのみを更新する必要があります。

メッセージ処理のロジックが並列である場合、複数のメッセージが失敗する可能性があり、ackIndex 変数への値の割り当ては正確ではないため、メッセージ処理が失敗した後に ackIndex に値を割り当てても意味がありません。最善の方法は、ackIndex に 0 の値を割り当て、メッセージのバッチ全体を再度消費することですが、これにより他の問題が発生する可能性があります。

2.2 連続メッセージ

連続メッセージの場合、msgTreeMap からメッセージを取得した後、最初に commerceMsgOrderlyTreeMap に配置する必要があります。オフセットを更新するとき、最大のメッセージ オフセットは 消費MsgOrderlyTreeMap ( lastKey) から取得されます。 。

3 要約

最初の質問に戻りますが、メッセージのバッチが順番に消費される場合、100 番目のメッセージが正常に消費され、50 番目のメッセージが失敗することはあり得ません。 50 番目のメッセージが失敗すると、ループが終了し、消費が続行されないためです。

同時消費の場合、この状況が発生した場合は、メッセージのバッチ全体を再度消費すること、つまり、ackIndex に 0 の値を割り当てることをお勧めします。考慮されます。

以上がAlibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事は51CTO.COMで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。
Microsoft Work Trend Index 2025は、職場の容量の緊張を示していますMicrosoft Work Trend Index 2025は、職場の容量の緊張を示していますApr 24, 2025 am 11:19 AM

AIの急速な統合により悪化した職場での急成長能力の危機は、増分調整を超えて戦略的な変化を要求します。 これは、WTIの調査結果によって強調されています。従業員の68%がワークロードに苦労しており、BURにつながります

AIは理解できますか?中国の部屋の議論はノーと言っていますが、それは正しいですか?AIは理解できますか?中国の部屋の議論はノーと言っていますが、それは正しいですか?Apr 24, 2025 am 11:18 AM

ジョン・サールの中国の部屋の議論:AIの理解への挑戦 Searleの思考実験は、人工知能が真に言語を理解できるのか、それとも真の意識を持っているのかを直接疑問に思っています。 チャインを無知な人を想像してください

中国の「スマート」AIアシスタントは、マイクロソフトのリコールのプライバシーの欠陥をエコーし​​ます中国の「スマート」AIアシスタントは、マイクロソフトのリコールのプライバシーの欠陥をエコーし​​ますApr 24, 2025 am 11:17 AM

中国のハイテク大手は、西部のカウンターパートと比較して、AI開発の別のコースを図っています。 技術的なベンチマークとAPI統合のみに焦点を当てるのではなく、「スクリーン認識」AIアシスタントを優先しています。

Dockerは、おなじみのコンテナワークフローをAIモデルとMCPツールにもたらしますDockerは、おなじみのコンテナワークフローをAIモデルとMCPツールにもたらしますApr 24, 2025 am 11:16 AM

MCP:AIシステムに外部ツールにアクセスできるようになります モデルコンテキストプロトコル(MCP)により、AIアプリケーションは標準化されたインターフェイスを介して外部ツールとデータソースと対話できます。人類によって開発され、主要なAIプロバイダーによってサポートされているMCPは、言語モデルとエージェントが利用可能なツールを発見し、適切なパラメーターでそれらを呼び出すことができます。ただし、環境紛争、セキュリティの脆弱性、一貫性のないクロスプラットフォーム動作など、MCPサーバーの実装にはいくつかの課題があります。 Forbesの記事「人類のモデルコンテキストプロトコルは、AIエージェントの開発における大きなステップです」著者:Janakiram MSVDockerは、コンテナ化を通じてこれらの問題を解決します。 Docker Hubインフラストラクチャに基づいて構築されたドキュメント

6億ドルのスタートアップを構築するために6つのAIストリートスマート戦略を使用する6億ドルのスタートアップを構築するために6つのAIストリートスマート戦略を使用するApr 24, 2025 am 11:15 AM

最先端のテクノロジーと巧妙なビジネスの洞察力を活用して、コントロールを維持しながら非常に収益性の高いスケーラブルな企業を作成する先見の明のある起業家によって採用された6つの戦略。このガイドは、建設を目指している起業家向けのためのものです

Googleフォトの更新は、すべての写真の見事なウルトラHDRのロックを解除しますGoogleフォトの更新は、すべての写真の見事なウルトラHDRのロックを解除しますApr 24, 2025 am 11:14 AM

Google Photosの新しいウルトラHDRツール:画像強化のゲームチェンジャー Google Photosは、強力なウルトラHDR変換ツールを導入し、標準的な写真を活気のある高ダイナミックレンジ画像に変換しました。この強化は写真家に利益をもたらします

Descopeは、AIエージェント統合の認証フレームワークを構築しますDescopeは、AIエージェント統合の認証フレームワークを構築しますApr 24, 2025 am 11:13 AM

技術アーキテクチャは、新たな認証の課題を解決します エージェントアイデンティティハブは、AIエージェントの実装を開始した後にのみ多くの組織が発見した問題に取り組んでいます。

Google Cloud Next2025と現代の仕事の接続された未来Google Cloud Next2025と現代の仕事の接続された未来Apr 24, 2025 am 11:12 AM

(注:Googleは私の会社であるMoor Insights&Strategyのアドバイザリークライアントです。) AI:実験からエンタープライズ財団まで Google Cloud Next 2025は、実験機能からエンタープライズテクノロジーのコアコンポーネント、ストリームへのAIの進化を紹介しました

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強力な PHP 統合開発環境

SublimeText3 英語版

SublimeText3 英語版

推奨: Win バージョン、コードプロンプトをサポート!

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。

WebStorm Mac版

WebStorm Mac版

便利なJavaScript開発ツール