Rumah  >  Artikel  >  Java  >  Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot

Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot

PHPz
PHPzke hadapan
2023-05-11 20:13:161119semak imbas

Terdapat dua mod mesej RocketMQ utama: mod siaran dan mod kluster (mod pengimbangan beban)

Mod siaran ialah setiap pengguna akan menggunakan mesej

Mod pengimbangan beban ialah setiap pengguna Penggunaan hanya akan digunakan sekali oleh pengguna tertentu

Kami biasanya menggunakan mod pengimbangan beban dalam perniagaan kami Sudah tentu, beberapa senario khas memerlukan penggunaan mod siaran, seperti menghantar mesej ke e-mel , telefon mudah alih atau gesaan dalam tapak;

Kami boleh menetapkannya melalui nilai atribut @RocketMQMessageListener messageModel ialah mod siaran dan MessageModel.BROADCASTING ialah beban kelompok lalai mod pengimbangan. MessageModel.CLUSTERING

Mari perkenalkan springboot+ rockermq integrates untuk melaksanakan mesej siaran

  • Buat projek Springboot dan tambahkan pergantungan rockermq

  • <!--rocketMq依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
  • Konfigurasikan roketmq

# Port

pelayan:
port: 8083

# Konfigurasi roketmq

roket :
name-server: 127.0.0.1:9876
#producer
producer:
#Producer group name, yang mesti unik dalam aplikasi
group: group1
#The lalai tamat masa untuk penghantaran mesej ialah 3000ms
hantar-mesej- tamat masa: 3000
#Apabila mesej mencapai 4096 bait, mesej akan dimampatkan. Lalai 4096
compress-message-body-threshold: 4096
#Had mesej maksimum, lalai ialah 128K
max-message-size: 4194304
#Bilangan percubaan semula untuk penghantaran mesej penyegerakan yang gagal retry-times-when-send-failed: 3
#Sama ada hendak mencuba semula ejen lain apabila penghantaran dalaman gagal, parameter ini hanya berkuat kuasa apabila terdapat berbilang broker
cuba semula-next-server: true
# Nombor daripada cubaan semula apabila penghantaran mesej tak segerak gagal
masa cuba semula-apabila-hantar-async-gagal: 3

    Tamat pengeluaran: Cipta pengawal baharu untuk menghantar mesej
  • Tamat pengeluaran boleh menghantar mesej mengikut logik penghantaran biasa
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}

    Buat dua pengguna untuk menggunakan mesej
Mulakan perkhidmatan dan uji penggunaan mod kluster

Ujian mod kluster: dua pengguna berkongsi mesej secara sama rata

  • Tukar nilai atribut messageModel kedua-dua pengguna di atas kepada mod siaran

Pengguna 1: Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}

Pengguna 2: Dalam Kumpulan dan topik pengguna yang sama seperti pengguna 1
    package com.example.springbootrocketdemo.config;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    /**
     * 广播消息
     * 配置RocketMQ监听
     * MessageModel.CLUSTERING:集群模式
     * MessageModel.BROADCASTING:广播模式
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
    public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("集群模式 消费者2,消费消息:"+s);
        }
    }
  • Mulakan semula perkhidmatan dan uji penggunaan mod siaran

Atas ialah kandungan terperinci Bagaimana untuk melaksanakan mesej siaran dalam RocketMQ dalam Springboot. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam