Maison >Java >javaDidacticiel >Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

WBOY
WBOYavant
2023-05-18 10:04:051880parcourir

Environnement : springboot2.3.9RELEASE + RocketMQ4.8.0

Dépend de

<dependency>   <groupid>org.springframework.boot</groupid>     <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency>     <groupid>org.apache.rocketmq</groupid>     <artifactid>rocketmq-spring-boot-starter</artifactid>     <version>2.2.0</version> </dependency>

Fichier de configuration

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq

Message normal

Envoyer

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }

Recevoir

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }</string>

Message séquentiel

Envoyer

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }

Voici la clé de hachage qui envoie le message à Dans différentes files d'attente,

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }</string>

consumeMode = ConsumeMode.ORDERLY indique que le mode message est le mode séquentiel, une file d'attente et un thread.

Résultat

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Lorsque consumerMode = ConsumeMode.CONCURRENTLY, le résultat de l'exécution est le suivant :

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Mode message cluster/diffusion

Expéditeur

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }

Mode message cluster

Consum euh

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>

message Model = MessageModel .CLUSTERING

Test

Démarrez deux services dont les ports sont respectivement 8080 et 8081

Service 8080

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Service 8081

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

En mode message cluster, chaque service reçoit une partie du message séparément, réalisant la charge équilibrage

Mode message de diffusion

Côté consommateur

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>

messageModel = MessageModel.BROADCASTING

Test

Démarrer deux services avec les ports 8080 et 8081

Service 8080

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Service 8081

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Dans le message du cluster mode , chaque service a reçu le même message séparément.

Message de transaction

3 statuts des transactions RocketMQ

TransactionStatus.CommitTransaction : validez le message de transaction, le consommateur peut consommer ce message

TransactionStatus.RollbackTransaction : annulez la transaction, ce qui signifie que le message sera supprimé et non autorisé à consommer.

TransactionStatus.Unknown : Statut intermédiaire, qui représente la nécessité de vérifier la file d'attente des messages pour déterminer le statut.

RocketMQ implémente les messages de transaction en deux phases principales : l'envoi et la soumission normaux des transactions et le processus de compensation des informations sur les transactions. Le processus global est le suivant :

Phase d'envoi et de soumission normale des transactions

1. (Le demi-message fait référence au message que le consommateur ne peut pas consommer temporairement)

2. Le serveur répond au résultat de l'écriture du message et le demi-message est envoyé avec succès

3. Commence à exécuter la transaction locale

4. ou Rollback en fonction de l'état d'exécution de la transaction locale Opération

Processus de compensation des informations sur la transaction

1. Si MQServer ne reçoit pas l'état d'exécution de la transaction locale pendant une longue période, il lancera une demande d'opération de révision de confirmation auprès du producteur

2. Une fois que le producteur a reçu la demande de révision de confirmation, vérifiez l'état d'exécution des transactions locales

3. Exécutez les opérations de validation ou de restauration en fonction des résultats de la vérification

La phase de compensation est principalement utilisée pour résoudre le problème de délai d'attente ou d'échec. lorsque les producteurs envoient des opérations Commit ou Rollback.

Expéditeur

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }

Producteur correspondant auditeur

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }

Consumer

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }</users>

Service

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }

Controller

@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }

Test

Après avoir appelé l'interface, la console affiche

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

On peut le voir dans le journal d'impression. le consommateur ne reçoit le message qu'une fois que tout a été enregistré.

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Supprimez les données puis testez si l'ID est 1, une erreur sera signalée.

Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ

Il n'y a aucune donnée dans la base de données. . .

Ce n’est pas très compliqué, ça se gère en 2 étapes.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer