Rumah >Java >javaTutorial >Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan

WBOY
WBOYke hadapan
2023-05-18 10:04:051867semak imbas

Persekitaran: springboot2.3.9RELEASE + RocketMQ4.8.0

Kebergantungan

<dependency>   <groupid>org.springframework.boot</groupid>     <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency>     <groupid>org.apache.rocketmq</groupid>     <artifactid>rocketmq-spring-boot-starter</artifactid>     <version>2.2.0</version> </dependency>

Fail konfigurasi

rreee

Mesej biasa

Hantar semula 🎜>Terima

server:   port: 8080 --- rocketmq:   nameServer: localhost:9876   producer:     group: demo-mq

Mesej berurutan

Hantar

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String message) {   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }

Berikut ialah mesej yang dihantar ke baris gilir berbeza berdasarkan kunci cincang

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("接收到消息:" + message) ;     }  }</string>

consumeMode.OR = ConsumeMode menunjukkan bahawa mod mesej ialah mod berjujukan, satu baris gilir dan satu utas.

Keputusan

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutanApabila consumeMode = ConsumeMode.SECARA SAMPINGAN, keputusan pelaksanaan adalah seperti berikut:

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutanMod Mesej Kluster/Siaran

Penghantar

@Resource private RocketMQTemplate rocketMQTemplate ;  public void sendOrder(String topic, String message, String tags, int id) {     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),              "order-" + id, new SendCallback() {                 @Override                 public void onSuccess(SendResult sendResult) {                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;                 }                 @Override                 public void onException(Throwable e) {                     e.printStackTrace() ;                 }             }); }

Mod Mesej Kelompok

Pengguna

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",      selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;     }  }</string>

messageModel = MessageModel.CLUSTERING

Mulakan dua perkhidmatan dengan port 8080 dan 8081

perkhidmatan 8080

perkhidmatan 8081

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan

Dalam mod mesej kluster, setiap perkhidmatan menerima sebahagian daripada mesej secara berasingan, mencapai pengimbangan bebanBagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutanMod mesej siaran

Pihak pengguna

@Resource private RocketMQTemplate rocketMQTemplate ;      public void send(String topic, String message, String tags) {     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }

messageModel = MessageModel.BROADCASTING

Uji

Mulakan dua perkhidmatan dengan port 8080 dan 8081

perkhidmatan 8080

perkhidmatan 8081Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan 🎜>

Dalam mod mesej kluster, setiap perkhidmatan menerima mesej yang sama.

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutanMesej urus niaga

Tiga status urus niaga RocketMQ

TransactionStatus.CommitTransaction: Serahkan mesej urus niaga, pengguna boleh menggunakan mesej ini

TransactionStatus. RollingbackTransaction: RollingbackTransaction kembali transaksi bermakna mesej akan dipadam dan tidak dibenarkan untuk digunakan.

TransactionStatus.Unknown: Status pertengahan, yang mewakili keperluan untuk menyemak baris gilir mesej untuk menentukan status.

RocketMQ melaksanakan mesej transaksi dalam dua fasa utama: penghantaran dan penyerahan transaksi biasa, dan proses pampasan maklumat transaksi Proses keseluruhan ialah:

Fasa penghantaran dan penyerahan transaksi biasa

. 1. Pengeluar menghantar separuh mesej kepada MQServer (separuh mesej merujuk kepada mesej yang pengguna tidak boleh gunakan buat sementara waktu)

2. Pelayan membalas hasil penulisan mesej dan separuh mesej berjaya dihantar

3. Mula melaksanakan transaksi tempatan

4. Lakukan operasi Komit atau Rollback mengikut status pelaksanaan transaksi tempatan

Proses pampasan maklumat transaksi

1. Jika MQServer tidak menerimanya untuk masa yang lama Status pelaksanaan transaksi tempatan akan memulakan permintaan operasi semakan pengesahan kepada pengeluar

2. Selepas pengeluar menerima permintaan semakan pengesahan, ia akan menyemak status pelaksanaan transaksi tempatan

3. Menurut Selepas menyemak keputusan, laksanakan operasi Komit atau Balikkan

Fasa pampasan digunakan terutamanya untuk menyelesaikan masalah tamat masa atau kegagalan apabila pengeluar menghantar operasi Commit atau Rollback.

Penghantar

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>

Pendengar sepadan pengeluar

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",      selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> {      @Override     public void onMessage(String message) {         System.out.println("ConsumerBroadListener1接收到消息:" + message) ;     }  }</string>

Pengguna

@Resource private RocketMQTemplate rocketMQTemplate ;      public void sendTx(String topic, Long id, String tags) {     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),              UUID.randomUUID().toString().replaceAll("-", "")) ; }

Perkhidmatan

@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {          @Resource     private BusinessService bs ;      @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {         // 这里执行本地的事务操作,比如保存数据。         try {             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据             String id = (String) msg.getHeaders().get("BID") ;             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ;             bs.save(users, new UsersLog(users.getId(), id)) ;         } catch (Exception e) {             e.printStackTrace() ;             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }      @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {         // 这里检查本地事务是否执行成功         String id = (String) msg.getHeaders().get("BID") ;         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ;         UsersLog usersLog = bs.queryUsersLog(id) ;         if (usersLog == null) {             return RocketMQLocalTransactionState.ROLLBACK ;         }         return RocketMQLocalTransactionState.COMMIT ;     }  }

Pengawal

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> {      @Override     public void onMessage(Users users) {         System.out.println("TX接收到消息:" + users) ;     }  }</users>

@Transactional public boolean save(Users users, UsersLog usersLog) {     usersRepository.save(users) ;     usersLogRepository.save(usersLog) ;     if (users.getId() == 1) {         throw new RuntimeException("数据错误") ;     }     return true ; }      public UsersLog queryUsersLog(String bid) {     return usersLogRepository.findByBid(bid) ; }
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) {     ps.sendTx("tx-topic", id, "tag10") ;     return "send transaction success" ; }
🎜>

Selepas memanggil antara muka, output konsol:

Daripada log cetakan, ia boleh dilihat bahawa pengguna menerima mesej hanya selepas semua mesej telah telah diselamatkan.

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan

Memadamkan data dan kemudian menguji ID sebagai 1 akan mengakibatkan ralat.

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan

Tiada data dalam pangkalan data. . .

Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutanBukankah ia sangat rumit?

Atas ialah kandungan terperinci Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam