Rumah >Java >javaTutorial >Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan
Persekitaran: springboot2.3.9RELEASE + RocketMQ4.8.0
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.2.0</version> </dependency>
Hantar semula 🎜>Terima
server: port: 8080 --- rocketmq: nameServer: localhost:9876 producer: group: demo-mq
Mesej berurutan
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
Berikut ialah mesej yang dihantar ke baris gilir berbeza berdasarkan kunci cincang
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }</string>
consumeMode.OR = ConsumeMode menunjukkan bahawa mod mesej ialah mod berjujukan, satu baris gilir dan satu utas.
Keputusan
Apabila consumeMode = ConsumeMode.SECARA SAMPINGAN, keputusan pelaksanaan adalah seperti berikut:
Mod Mesej Kluster/Siaran
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendOrder(String topic, String message, String tags, int id) { rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), "order-" + id, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; } @Override public void onException(Throwable e) { e.printStackTrace() ; } }); }
Mod Mesej Kelompok
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; } }</string>
messageModel = MessageModel.CLUSTERING
Mulakan dua perkhidmatan dengan port 8080 dan 8081
perkhidmatan 8080
perkhidmatan 8081 Dalam mod mesej kluster, setiap perkhidmatan menerima sebahagian daripada mesej secara berasingan, mencapai pengimbangan bebanMod mesej siaranPihak pengguna@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String topic, String message, String tags) { rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
perkhidmatan 8081 🎜>
Dalam mod mesej kluster, setiap perkhidmatan menerima mesej yang sama.Mesej urus niaga
Tiga status urus niaga RocketMQ
TransactionStatus. RollingbackTransaction: RollingbackTransaction kembali transaksi bermakna mesej akan dipadam dan tidak dibenarkan untuk digunakan.
TransactionStatus.Unknown: Status pertengahan, yang mewakili keperluan untuk menyemak baris gilir mesej untuk menentukan status.
RocketMQ melaksanakan mesej transaksi dalam dua fasa utama: penghantaran dan penyerahan transaksi biasa, dan proses pampasan maklumat transaksi Proses keseluruhan ialah:
Fasa penghantaran dan penyerahan transaksi biasa
. 1. Pengeluar menghantar separuh mesej kepada MQServer (separuh mesej merujuk kepada mesej yang pengguna tidak boleh gunakan buat sementara waktu)
2. Pelayan membalas hasil penulisan mesej dan separuh mesej berjaya dihantar
3. Mula melaksanakan transaksi tempatan
4. Lakukan operasi Komit atau Rollback mengikut status pelaksanaan transaksi tempatan
Proses pampasan maklumat transaksi
1. Jika MQServer tidak menerimanya untuk masa yang lama Status pelaksanaan transaksi tempatan akan memulakan permintaan operasi semakan pengesahan kepada pengeluar
2. Selepas pengeluar menerima permintaan semakan pengesahan, ia akan menyemak status pelaksanaan transaksi tempatan
3. Menurut Selepas menyemak keputusan, laksanakan operasi Komit atau Balikkan
Fasa pampasan digunakan terutamanya untuk menyelesaikan masalah tamat masa atau kegagalan apabila pengeluar menghantar operasi Commit atau Rollback.
Penghantar
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
Pendengar sepadan pengeluar
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
Pengguna
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendTx(String topic, Long id, String tags) { rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), UUID.randomUUID().toString().replaceAll("-", "")) ; }
Perkhidmatan
@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener { @Resource private BusinessService bs ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 这里执行本地的事务操作,比如保存数据。 try { // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据 String id = (String) msg.getHeaders().get("BID") ; Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ; bs.save(users, new UsersLog(users.getId(), id)) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 这里检查本地事务是否执行成功 String id = (String) msg.getHeaders().get("BID") ; System.out.println("执行查询ID为:" + id + " 的数据是否存在") ; UsersLog usersLog = bs.queryUsersLog(id) ; if (usersLog == null) { return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } }
Pengawal
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> { @Override public void onMessage(Users users) { System.out.println("TX接收到消息:" + users) ; } }</users>
@Transactional public boolean save(Users users, UsersLog usersLog) { usersRepository.save(users) ; usersLogRepository.save(usersLog) ; if (users.getId() == 1) { throw new RuntimeException("数据错误") ; } return true ; } public UsersLog queryUsersLog(String bid) { return usersLogRepository.findByBid(bid) ; }
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) { ps.sendTx("tx-topic", id, "tag10") ; return "send transaction success" ; }🎜>
Selepas memanggil antara muka, output konsol:
Daripada log cetakan, ia boleh dilihat bahawa pengguna menerima mesej hanya selepas semua mesej telah telah diselamatkan. Memadamkan data dan kemudian menguji ID sebagai 1 akan mengakibatkan ralat. Tiada data dalam pangkalan data. . .Bukankah ia sangat rumit?
Atas ialah kandungan terperinci Bagaimana SpringBoot menyepadukan transaksi RocketMQ, penyiaran dan mesej berurutan. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!