


Alibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?
皆さんこんにちは、ジュン兄です。
最近、インタビューに参加したときに読者から質問がありました。コンシューマが 100 個などのメッセージのバッチをプルした場合、100 番目のメッセージの消費は成功しますが、50 番目のメッセージの消費は失敗します。 . オフセット どのように更新されますか?この問題に関して、今日はメッセージのバッチが消費されなかった場合にオフセットを保存する方法について話しましょう。
1 プル メッセージ
1.1 プル リクエストのカプセル化
RocketMQ プッシュ モードを例にとると、RocketMQ コンシューマのスタートアップ コードは次のとおりです。上記の DefaultMQPushConsumer はプッシュ モードのコンシューマーであり、起動メソッドは start です。コンシューマーが開始されると、リバランス スレッド (RebalanceService) がトリガーされます。このスレッドのタスクは、無限ループで継続的にリバランスを行い、最後にメッセージを pullRequestQueue にプルするリクエストをカプセル化することです。このプロセスに関係する UML クラス図は次のとおりです。
1.2 プル リクエストの処理
プル リクエスト PullRequest をカプセル化した後、RocketMQ はメッセージを継続的に取得しません。処理のために pullRequestQueue からリクエストをプルします。 UML クラス図は次のとおりです。
メッセージをプルするためのエントリ メソッドは無限ループであり、コードは次のとおりです。ここにメッセージがある場合、それを PullCallback に送信すると、このコールバック関数によって処理されます。
プルされたメッセージは、まず ProcessQueue の msgTreeMap に置かれ、次に処理のためにスレッド クラス ConsumeRequest にカプセル化されます。コードを合理化した後の ConsumeRequest 処理ロジックは次のようになります。
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
2 消費結果の処理
2.1 同時メッセージ
同時メッセージ処理の消費結果のコードは次のように簡略化されます。次のとおりです:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
上記のコードからわかるように、メッセージ処理のロジックがシリアルである場合、たとえば記事の冒頭のコードでは for ループを使用してメッセージを処理します。特定のメッセージの処理が失敗した場合は、ループを直接終了し、ConsumeConcurrentlyContext の ackIndex 変数にメッセージ リスト内の失敗したメッセージの位置が割り当てられるため、この失敗したメッセージに続くメッセージは処理されなくなり、ブローカーに送信されます。再引っ張りを待ちます。コードは次のとおりです。
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
正常に消費されたメッセージは ProcessQueue の msgTreeMap から削除され、msgTreeMap の最小オフセット (firstKey) が更新のために返されます。注: クラスター モード オフセットはブローカー側に保存されます。オフセットを更新するには、メッセージをブローカーに送信する必要があります。ただし、ブロードキャスト モード オフセットはコンシューマー側に保存され、ローカル オフセットのみを更新する必要があります。
メッセージ処理のロジックが並列である場合、複数のメッセージが失敗する可能性があり、ackIndex 変数への値の割り当ては正確ではないため、メッセージ処理が失敗した後に ackIndex に値を割り当てても意味がありません。最善の方法は、ackIndex に 0 の値を割り当て、メッセージのバッチ全体を再度消費することですが、これにより他の問題が発生する可能性があります。
2.2 連続メッセージ
連続メッセージの場合、msgTreeMap からメッセージを取得した後、最初に commerceMsgOrderlyTreeMap に配置する必要があります。オフセットを更新するとき、最大のメッセージ オフセットは 消費MsgOrderlyTreeMap ( lastKey) から取得されます。 。
3 要約
最初の質問に戻りますが、メッセージのバッチが順番に消費される場合、100 番目のメッセージが正常に消費され、50 番目のメッセージが失敗することはあり得ません。 50 番目のメッセージが失敗すると、ループが終了し、消費が続行されないためです。
同時消費の場合、この状況が発生した場合は、メッセージのバッチ全体を再度消費すること、つまり、ackIndex に 0 の値を割り当てることをお勧めします。考慮されます。
以上がAlibaba の 2 ページ目: RocketMQ コンシューマーはメッセージのバッチをプルしますが、一部のメッセージは消費できません。オフセットを更新するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

AIの急速な統合により悪化した職場での急成長能力の危機は、増分調整を超えて戦略的な変化を要求します。 これは、WTIの調査結果によって強調されています。従業員の68%がワークロードに苦労しており、BURにつながります

ジョン・サールの中国の部屋の議論:AIの理解への挑戦 Searleの思考実験は、人工知能が真に言語を理解できるのか、それとも真の意識を持っているのかを直接疑問に思っています。 チャインを無知な人を想像してください

中国のハイテク大手は、西部のカウンターパートと比較して、AI開発の別のコースを図っています。 技術的なベンチマークとAPI統合のみに焦点を当てるのではなく、「スクリーン認識」AIアシスタントを優先しています。

MCP:AIシステムに外部ツールにアクセスできるようになります モデルコンテキストプロトコル(MCP)により、AIアプリケーションは標準化されたインターフェイスを介して外部ツールとデータソースと対話できます。人類によって開発され、主要なAIプロバイダーによってサポートされているMCPは、言語モデルとエージェントが利用可能なツールを発見し、適切なパラメーターでそれらを呼び出すことができます。ただし、環境紛争、セキュリティの脆弱性、一貫性のないクロスプラットフォーム動作など、MCPサーバーの実装にはいくつかの課題があります。 Forbesの記事「人類のモデルコンテキストプロトコルは、AIエージェントの開発における大きなステップです」著者:Janakiram MSVDockerは、コンテナ化を通じてこれらの問題を解決します。 Docker Hubインフラストラクチャに基づいて構築されたドキュメント

最先端のテクノロジーと巧妙なビジネスの洞察力を活用して、コントロールを維持しながら非常に収益性の高いスケーラブルな企業を作成する先見の明のある起業家によって採用された6つの戦略。このガイドは、建設を目指している起業家向けのためのものです

Google Photosの新しいウルトラHDRツール:画像強化のゲームチェンジャー Google Photosは、強力なウルトラHDR変換ツールを導入し、標準的な写真を活気のある高ダイナミックレンジ画像に変換しました。この強化は写真家に利益をもたらします

技術アーキテクチャは、新たな認証の課題を解決します エージェントアイデンティティハブは、AIエージェントの実装を開始した後にのみ多くの組織が発見した問題に取り組んでいます。

(注:Googleは私の会社であるMoor Insights&Strategyのアドバイザリークライアントです。) AI:実験からエンタープライズ財団まで Google Cloud Next 2025は、実験機能からエンタープライズテクノロジーのコアコンポーネント、ストリームへのAIの進化を紹介しました


ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

Video Face Swap
完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

人気の記事

ホットツール

ZendStudio 13.5.1 Mac
強力な PHP 統合開発環境

SublimeText3 英語版
推奨: Win バージョン、コードプロンプトをサポート!

メモ帳++7.3.1
使いやすく無料のコードエディター

SAP NetWeaver Server Adapter for Eclipse
Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。

WebStorm Mac版
便利なJavaScript開発ツール

ホットトピック









