ホームページ >Java >&#&チュートリアル >SpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法
環境: springboot2.3.9RELEASE RocketMQ4.8.0
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.2.0</version> </dependency>
server: port: 8080 --- rocketmq: nameServer: localhost:9876 producer: group: demo-mq
送信
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
Accept
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }</string>
Send
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendOrder(String topic, String message, String tags, int id) { rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), "order-" + id, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; } @Override public void onException(Throwable e) { e.printStackTrace() ; } }); }
ハッシュキー
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; } }</string>
consumeMode = ConsumeMode.ORDERLYに基づいて別のキューに送信されるメッセージを示します。メッセージ モードはシーケンシャル モードで、1 つのキューと 1 つのスレッドです。
Result
consumeMode = ConsumeMode.CONCURRENTLY の場合、実行結果は次のようになります。
送信者
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String topic, String message, String tags) { rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
コンシューマー
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
messageModel = MessageModel.CLUSTERING
Test
ポート 8080 と 8081 で 2 つのサービスを開始します
8080 サービス
8081 サービス
クラスター メッセージ モードでは、各サービスは負荷分散を実現するためにメッセージの一部を個別に受信します。
コンシューマー エンド
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
messageModel = MessageModel。ブロードキャスティング
テスト
ポート 8080 と 8081 で 2 つのサービスを開始します
8080 サービス
8081 サービス
#クラスター メッセージ モードでは、各サービスは同じメッセージを受信します。 トランザクション メッセージRocketMQ トランザクションの 3 つのステータスTransactionStatus.CommitTransaction: トランザクション メッセージを送信し、コンシューマーはこのメッセージを消費できますTransactionStatus.RollbackTransaction: ローリングトランザクションを戻すということは、メッセージが削除され、使用できなくなることを意味します。 TransactionStatus.Unknown: 中間ステータス。ステータスを判断するためにメッセージ キューをチェックする必要があることを表します。 RocketMQ のトランザクション メッセージの実装は、主に 2 つの段階に分かれています: 通常のトランザクションの送信と送信、およびトランザクション情報の補償プロセスです。全体のプロセスは次のとおりです: 通常のトランザクションの送信と送信段階1. プロデューサは MQServer にハーフ メッセージを送信します (ハーフ メッセージとは、コンシューマが一時的に消費できないメッセージを指します)2. サーバーはメッセージの書き込み結果に応答し、ハーフ メッセージが送信されます正常に完了しました 3. ローカルトランザクションの実行を開始します4. ローカルトランザクションの実行状況に応じてCommitまたはRollbackを実行しますトランザクション情報の補償処理1. MQServer が長期間受信しない場合、ローカル トランザクションの実行ステータスにより、プロデューサーへの確認レビュー操作リクエストが開始されます。2. プロデューサが確認レビュー リクエストを受信した後、ローカル トランザクションの実行ステータスを確認します。3. 「結果を確認した後、コミットまたはロールバック操作を実行します。」によると、
#補償フェーズは主にタイムアウトまたはロールバックの問題を解決するために使用されます。プロデューサがコミットまたはロールバック操作を送信すると失敗します。
Sender
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendTx(String topic, Long id, String tags) { rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), UUID.randomUUID().toString().replaceAll("-", "")) ; }
プロデューサーに対応するリスナー
@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener { @Resource private BusinessService bs ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 这里执行本地的事务操作,比如保存数据。 try { // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据 String id = (String) msg.getHeaders().get("BID") ; Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ; bs.save(users, new UsersLog(users.getId(), id)) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 这里检查本地事务是否执行成功 String id = (String) msg.getHeaders().get("BID") ; System.out.println("执行查询ID为:" + id + " 的数据是否存在") ; UsersLog usersLog = bs.queryUsersLog(id) ; if (usersLog == null) { return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } }
Consumer
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> { @Override public void onMessage(Users users) { System.out.println("TX接收到消息:" + users) ; } }</users>
Service
@Transactional public boolean save(Users users, UsersLog usersLog) { usersRepository.save(users) ; usersLogRepository.save(usersLog) ; if (users.getId() == 1) { throw new RuntimeException("数据错误") ; } return true ; } public UsersLog queryUsersLog(String bid) { return usersLogRepository.findByBid(bid) ; }
Controller
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) { ps.sendTx("tx-topic", id, "tag10") ; return "send transaction success" ; }
Test
インターフェイスを呼び出した後のコンソール出力:
印刷ログから、コンシューマはすべての処理が完了した後でのみメッセージを受信することがわかります。メッセージが保存されました。
データを削除してから ID を 1 としてテストすると、エラーが発生します。
#データベースにデータがありません。 。 。
非常に複雑ではありませんか? 2 段階で処理できます。以上がSpringBoot が RocketMQ トランザクション、ブロードキャスト、連続メッセージを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。