検索
ホームページJava&#&チュートリアルSpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

環境: springboot2.3.9RELEASE RocketMQ4.8.0

依存関係

<dependency>   <groupid>org.springframework.boot</groupid>     <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency>     <groupid>org.apache.rocketmq</groupid>     <artifactid>rocketmq-spring-boot-starter</artifactid>     <version>2.2.0</version> </dependency>

設定ファイル

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq

通常のメッセージ

送信

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }

Accept

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }</string>

連続メッセージ

Send

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }

ハッシュキー

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }</string>

consumeMode = ConsumeMode.ORDERLYに基づいて別のキューに送信されるメッセージを示します。メッセージ モードはシーケンシャル モードで、1 つのキューと 1 つのスレッドです。

Result

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

consumeMode = ConsumeMode.CONCURRENTLY の場合、実行結果は次のようになります。

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

# #クラスター/ブロードキャスト メッセージ モード

送信者

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }

クラスター メッセージ モード

コンシューマー

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>

messageModel = MessageModel.CLUSTERING

Test

ポート 8080 と 8081 で 2 つのサービスを開始します

8080 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

8081 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

クラスター メッセージ モードでは、各サービスは負荷分散を実現するためにメッセージの一部を個別に受信します。

ブロードキャスト メッセージ モード

コンシューマー エンド

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>

messageModel = MessageModel。ブロードキャスティング

テスト

ポート 8080 と 8081 で 2 つのサービスを開始します

8080 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

8081 サービス

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

#クラスター メッセージ モードでは、各サービスは同じメッセージを受信します。

トランザクション メッセージ

RocketMQ トランザクションの 3 つのステータス

TransactionStatus.CommitTransaction: トランザクション メッセージを送信し、コンシューマーはこのメッセージを消費できます

TransactionStatus.RollbackTransaction: ローリングトランザクションを戻すということは、メッセージが削除され、使用できなくなることを意味します。

TransactionStatus.Unknown: 中間ステータス。ステータスを判断するためにメッセージ キューをチェックする必要があることを表します。

RocketMQ のトランザクション メッセージの実装は、主に 2 つの段階に分かれています: 通常のトランザクションの送信と送信、およびトランザクション情報の補償プロセスです。全体のプロセスは次のとおりです:

通常のトランザクションの送信と送信段階

1. プロデューサは MQServer にハーフ メッセージを送信します (ハーフ メッセージとは、コンシューマが一時的に消費できないメッセージを指します)

2. サーバーはメッセージの書き込み結果に応答し、ハーフ メッセージが送信されます正常に完了しました

3. ローカルトランザクションの実行を開始します

4. ローカルトランザクションの実行状況に応じてCommitまたはRollbackを実行します

トランザクション情報の補償処理

1. MQServer が長期間受信しない場合、ローカル トランザクションの実行ステータスにより、プロデューサーへの確認レビュー操作リクエストが開始されます。

2. プロデューサが確認レビュー リクエストを受信した後、ローカル トランザクションの実行ステータスを確認します。

3. 「結果を確認した後、コミットまたはロールバック操作を実行します。」によると、

#補償フェーズは主にタイムアウトまたはロールバックの問題を解決するために使用されます。プロデューサがコミットまたはロールバック操作を送信すると失敗します。

Sender

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }

プロデューサーに対応するリスナー

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }

Consumer

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }</users>

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }

Controller

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }

Test

インターフェイスを呼び出した後のコンソール出力:

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法印刷ログから、コンシューマはすべての処理が完了した後でのみメッセージを受信することがわかります。メッセージが保存されました。

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法データを削除してから ID を 1 としてテストすると、エラーが発生します。

#データベースにデータがありません。 。 。 SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法

非常に複雑ではありませんか? 2 段階で処理できます。

以上がSpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事は亿速云で複製されています。侵害がある場合は、admin@php.cn までご連絡ください。
Java開発のどの側面がプラットフォームに依存していますか?Java開発のどの側面がプラットフォームに依存していますか?Apr 26, 2025 am 12:19 AM

javadevelopmentisnotentirelylylypratform-IndopentDuetoseveralfactors.1)jvmvariationsaffectperformanceandbehavioracrossdifferentos.2)nativeLibrariesviajniintroducePlatform-specificissues.3)giaiasystemsdifferbeTioneplateplatifflics.4)

さまざまなプラットフォームでJavaコードを実行するときにパフォーマンスの違いはありますか?なぜ?さまざまなプラットフォームでJavaコードを実行するときにパフォーマンスの違いはありますか?なぜ?Apr 26, 2025 am 12:15 AM

Javaコードは、さまざまなプラットフォームで実行するときにパフォーマンスの違いがあります。 1)JVMの実装と最適化戦略は、OracleJDKやOpenJDKなどとは異なります。 2)メモリ管理やスレッドスケジューリングなどのオペレーティングシステムの特性もパフォーマンスに影響します。 3)適切なJVMを選択し、JVMパラメーターとコード最適化を調整することにより、パフォーマンスを改善できます。

Javaのプラットフォームの独立性の制限は何ですか?Javaのプラットフォームの独立性の制限は何ですか?Apr 26, 2025 am 12:10 AM

java'splatformindepentedencehaslimitationsincludingporformanceoverhead、versioncompatibulisisues、changleSwithnativeLibraryIntegration、プラットフォーム固有の機能、およびjvminStallation/maintenation。

プラットフォームの独立性とクロスプラットフォーム開発の違いを説明します。プラットフォームの独立性とクロスプラットフォーム開発の違いを説明します。Apr 26, 2025 am 12:08 AM

PlatformEndependEncealLowsProgramStorunonAnyPlatformWithOdification、whilecross-platformdevelopmentReadreessomeplatform-specificAdjustments.platformindependence、explifiedByjava、unableSiversAlexecutionButMayCompromperformance

ジャストインタイム(JIT)コンピレーションは、Javaのパフォーマンスとプラットフォームの独立性にどのような影響を与えますか?ジャストインタイム(JIT)コンピレーションは、Javaのパフォーマンスとプラットフォームの独立性にどのような影響を与えますか?Apr 26, 2025 am 12:02 AM

jitcompalilationinjavaenhancesperformance whelemaintaining formindepence.1)itdynamicallyTrantesiNTODENATIVEMACHINECODEATRUNTIME、最適化されたコードを最適化すること、

Javaがクロスプラットフォームデスクトップアプリケーションを開発するための人気のある選択肢なのはなぜですか?Javaがクロスプラットフォームデスクトップアプリケーションを開発するための人気のある選択肢なのはなぜですか?Apr 25, 2025 am 12:23 AM

javaispopularforsoss-platformdesktopapplicationsduetoits "writeonce、runaynay" philosophy.1)itusesbytecodatiTatrunnanyjvm-adipplatform.2)ライブラリリケンディンガンドジャヴァフククレアティック - ルルクリス

Javaでプラットフォーム固有のコードを作成する必要がある場合がある状況について話し合います。Javaでプラットフォーム固有のコードを作成する必要がある場合がある状況について話し合います。Apr 25, 2025 am 12:22 AM

Javaでプラットフォーム固有のコードを作成する理由には、特定のオペレーティングシステム機能へのアクセス、特定のハードウェアとの対話、パフォーマンスの最適化が含まれます。 1)JNAまたはJNIを使​​用して、Windowsレジストリにアクセスします。 2)JNIを介してLinux固有のハードウェアドライバーと対話します。 3)金属を使用して、JNIを介してMacOSのゲームパフォーマンスを最適化します。それにもかかわらず、プラットフォーム固有のコードを書くことは、コードの移植性に影響を与え、複雑さを高め、パフォーマンスのオーバーヘッドとセキュリティのリスクをもたらす可能性があります。

プラットフォームの独立性に関連するJava開発の将来の傾向は何ですか?プラットフォームの独立性に関連するJava開発の将来の傾向は何ですか?Apr 25, 2025 am 12:12 AM

Javaは、クラウドネイティブアプリケーション、マルチプラットフォームの展開、および言語間の相互運用性を通じて、プラットフォームの独立性をさらに強化します。 1)クラウドネイティブアプリケーションは、GraalvmとQuarkusを使用してスタートアップ速度を向上させます。 2)Javaは、埋め込みデバイス、モバイルデバイス、量子コンピューターに拡張されます。 3)Graalvmを通じて、JavaはPythonやJavaScriptなどの言語とシームレスに統合して、言語間の相互運用性を高めます。

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

mPDF

mPDF

mPDF は、UTF-8 でエンコードされた HTML から PDF ファイルを生成できる PHP ライブラリです。オリジナルの作者である Ian Back は、Web サイトから「オンザフライ」で PDF ファイルを出力し、さまざまな言語を処理するために mPDF を作成しました。 HTML2FPDF などのオリジナルのスクリプトよりも遅く、Unicode フォントを使用すると生成されるファイルが大きくなりますが、CSS スタイルなどをサポートし、多くの機能強化が施されています。 RTL (アラビア語とヘブライ語) や CJK (中国語、日本語、韓国語) を含むほぼすべての言語をサポートします。ネストされたブロックレベル要素 (P、DIV など) をサポートします。

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

VSCode Windows 64 ビットのダウンロード

VSCode Windows 64 ビットのダウンロード

Microsoft によって発売された無料で強力な IDE エディター

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。