搜尋
首頁Javajava教程SpringBoot整合RocketMQ的方法是什麼

1. SpringBoot整合RocketMQ

在SpringBoot中整合RocketMQ,只需要簡單四步驟:

1.引入相關依賴

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

2.新增RocketMQ的相關配置

rocketmq:
    consumer:
        group: springboot_consumer_group
        # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
        pull-batch-size: 10
    name-server: 10.5.103.6:9876
    producer:
        # 发送同一类消息的设置为同一个group,保证唯一
        group: springboot_producer_group
        # 发送消息超时时间,默认3000
        sendMessageTimeout: 10000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
        # 异步消息重试此处,默认2
        retryTimesWhenSendAsyncFailed: 2
        # 消息最大长度,默认1024 * 1024 * 4(默认4M)
        maxMessageSize: 4096
        # 压缩消息阈值,默认4k(1024 * 4)
        compressMessageBodyThreshold: 4096
        # 是否在内部发送失败时重试另一个broker,默认false
        retryNextServer: false

3.使用提供的模板工具類別RocketMQTemplate發送訊息

@RestController
public class NormalProduceController {
  @Setter(onMethod_ = @Autowired)
  private RocketMQTemplate rocketmqTemplate;
  
  @GetMapping("/test")
  public SendResult test() {
    Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
    SendResult sendResult = rocketmqTemplate.send(topic, msg);
  }
}

4.實作RocketMQListener介面消費訊息

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")
public class MyConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理消息的逻辑
        System.out.println("Received message: " + message);
    }
}

以上4步驟即可實現SpringBoot與RocketMQ的整合,這部分屬於基礎知識,不做太多說明。

2 使用RocketMQ會遇到的問題

以下是一些在SpringBoot中使用RocketMQ時常遇到的問題,現在為您逐一解決。

2.1 WARN No appenders could be found for logger

#啟動專案時會在日誌中看到如下告警

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.

#此時我們只需要在啟動類別中設定環境變數 rocketmq.client.logUseSlf4j 為true 明確指定RocketMQ的日誌框架

@SpringBootApplication
public class RocketDemoApplication {

    public static void main(String[] args) {
        /*
         * 指定使用的日志框架,否则将会告警
         * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
         * RocketMQLog:WARN Please initialize the logger system properly.
         */
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
      
        SpringApplication.run(RocketDemoApplication.class, args);
    }
}

同時還得在設定檔中調整日誌級別,不然在控制台會一直看到broker的日誌資訊

logging:
    level:
      RocketmqClient: ERROR
    io:
        netty: ERROR

Nate#Nate#Date#Date#Date#Date#Date

#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date##Date#Date#Date#Date#Date##Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#Date#D.在使用Java8後經常會使用

LocalDate/LocalDateTime這兩個時間類型字段,然而RocketMQ原始配置並不支援Java時間類型,當我們發送的實體訊息中包含上述兩個字段時,消費端在消費時會出現如下所示的錯誤。

例如生產者的程式碼如下:

@GetMapping("/test")
public void test(){
  //普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发。
  RocketMessage rocketMessage = RocketMessage.builder().
    id(1111L).
    message("hello,world")
    .localDate(LocalDate.now())
    .localDateTime(LocalDateTime.now())
    .build();
  rocketmqTemplate.convertAndSend(destination,rocketMessage);
}

消費者的程式碼如下:

@Component
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic")
public class RocketMQConsumer implements RocketMQListener<RocketMessage> {
    @Override
    public void onMessage(RocketMessage message) {
        System.out.println("消费消息-" + message);
    }
}

消費者開始消費時會出現類型轉換例外錯誤

Cannot construct instance of java.time.LocalDate,錯誤詳情如下:

SpringBoot整合RocketMQ的方法是什麼

原因:RocketMQ內建使用的轉換器是RocketMQMessageConverter,轉換Json時使用的是MappingJackson2MessageConverter,但是這個轉換器不支援時間類型。

解決方案:需要自訂訊息轉換器,將MappingJackson2MessageConverter進行替換,並新增支援時間模組

@Configuration
public class RocketMQEnhanceConfig {

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}

2.3 RockeMQ環境隔離

在使用RocketMQ時,通常會在程式碼中直接指定訊息主題(topic),而且開發環境和測試環境可能共用一個RocketMQ環境。如果沒有進行處理,在開發環境發送的訊息就可能被測試環境的消費者消費,測試環境發送的訊息也可能被開發環境的消費者消費,從而導致數據混亂的問題。

為了解決這個問題,我們可以根據不同的環境實現自動隔離。透過簡單配置一個選項,如dev、test、prod等不同環境,所有的訊息都會自動隔離。例如,當傳送的訊息主題為

consumer_topic時,可以自動在topic後面加上環境後綴,如consumer_topic_dev

那麼,我們該如何實現呢?

可以寫一個設定類別實作BeanPostProcessor,並重寫postProcessBeforeInitialization方法,在監聽器實例初始化前修改對應的topic。

  • BeanPostProcessor是Spring框架中的一個接口,它的作用是在Spring容器實例化、配置完bean之後,在bean初始化前後進行一些額外的處理工作。

  • 具體來說,BeanPostProcessor介面定義了兩個方法:

    • postProcessBeforeInitialization(Object bean, String beanName): 在bean初始化之前先處理,可以對bean做一些修改等操作。

    • postProcessAfterInitialization(Object bean, String beanName): 在bean初始化之後進行處理,可以進行一些清理或其他動作。

  • BeanPostProcessor可以在應用程式中對Bean的建立和初始化過程進行攔截和修改,對Bean的生命週期進行幹預和操作。它可以對所有的Bean類別實例進行增強處理,使得開發人員可以在Bean初始化前後自訂一些操作,從而實現自己的業務需求。例如,可以透過BeanPostProcessor來實現注入某些必要的屬性值、加入某一個物件等等。

實作方案如下:

1.在設定檔中增加相關組態

rocketmq:
	enhance:
  	# 启动隔离,用于激活配置类EnvironmentIsolationConfig
  	# 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果
  	enabledIsolation: true
  	# 隔离环境名称,拼接到topic后,topic_dev,默认空字符串
  	environment: dev

2.新增組態類,在實例化訊息監聽者之前把topic修改掉

@Configuration
public class EnvironmentIsolationConfig implements BeanPostProcessor {
  	@Value("${rocketmq.enhance.enabledIsolation:true}")
    private boolean enabledIsolation;
    @Value("${rocketmq.enhance.environment:&#39;&#39;}")
    private String environmentName;
  
    /**
     * 在装载Bean之前实现参数修改
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DefaultRocketMQListenerContainer){

            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
					  //拼接Topic
            if(enabledIsolation && StringUtils.hasText(environmentName)){
                container.setTopic(String.join("_", container.getTopic(),environmentName));
            }
            return container;
        }
        return bean;
    }
}

啟動專案可以看到日誌中訊息監聽的佇列已經被修改了

2023-03-23 17:04:59.726 [main] INFO  o.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{consumerGroup='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}

3. RocketMQ二次封装

在解释为什么要二次封装之前先来看看RocketMQ官方文档中推荐的最佳实践

  • 消息发送成功或者失败要打印消息日志,用于业务排查问题。

  • 如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。

  • RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。

上面三个步骤基本每次发送消息或者消费消息都要实现,属于重复动作。

接下来讨论的是在RocketMQ中发送消息时选择何种消息类型最为合适。

在RocketMQ中有四种可选格式:

  • 发送Json对象

  • 发送转Json后的String对象

  • 根据业务封装对应实体类

  • 直接使用原生MessageExt接收。

对于如何选择消息类型,需要考虑到消费者在不查看消息发送者的情况下,如何获取消息的含义。因此,在这种情况下,使用第三种方式即根据业务封装对应实体类的方式最为合适,也是大多数开发者在发送消息时的常用方式。

有了上面两点结论以后我们来看看为什么要对RocketMQ二次封装。

3.1 为什么要二次封装

按照上述最佳实践,一个完整的消息传递链路从生产到消费应包括 准备消息、发送消息、记录消息日志、处理发送失败、记录接收消息日志、处理业务逻辑、异常处理和异常重试 等步骤。

虽然使用原生RocketMQ可以完成这些动作,但每个生产者和消费者都需要编写大量重复的代码来完成相同的任务,这就是需要进行二次封装的原因。我们希望通过二次封装,**生产者只需准备好消息实体并调用封装后的工具类发送,而消费者只需处理核心业务逻辑,其他公共逻辑会得到统一处理。 **

在二次封装中,关键是找出框架在日常使用中所涵盖的许多操作,以及区分哪些操作是可变的,哪些是不变的。以上述例子为例,实际上只有生产者的消息准备和消费者的业务处理是可变的操作,需要根据需求进行处理,而其他步骤可以固定下来形成一个模板。

当然,本文提到的二次封装不是指对源代码进行封装,而是针对工具的原始使用方式进行的封装。可以将其与Mybatis和Mybatis-plus区分开来。这两者都能完成任务,只不过Mybatis-plus更为简单便捷。

3.2 实现二次封装

实现二次封装需要创建一个自定义的starter,这样其他项目只需要依赖此starter即可使用封装功能。同时,在自定义starter中还需要解决文章第二部分中提到的一些问题。

代码结构如下所示:

SpringBoot整合RocketMQ的方法是什麼

3.2.1 消息实体类的封装
/**
 * 消息实体,所有消息都需要继承此类
 * 公众号:JAVA日知录
 */
@Data
public abstract class BaseMessage {
    /**
     * 业务键,用于RocketMQ控制台查看消费情况
     */
    protected String key;
    /**
     * 发送消息来源,用于排查问题
     */
    protected String source = "";

    /**
     * 发送时间
     */
    protected LocalDateTime sendTime = LocalDateTime.now();

    /**
     * 重试次数,用于判断重试次数,超过重试次数发送异常警告
     */
    protected Integer retryTimes = 0;
}

后面所有发送的消息实体都需要继承此实体类。

3.2.2 消息发送工具类的封装
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RocketMQEnhanceTemplate {
    private final RocketMQTemplate template;

    @Resource
    private RocketEnhanceProperties rocketEnhanceProperties;

    public RocketMQTemplate getTemplate() {
        return template;
    }

    /**
     * 根据系统上下文自动构建隔离后的topic
     * 构建目的地
     */
    public String buildDestination(String topic, String tag) {
        topic = reBuildTopic(topic);
        return topic + ":" + tag;
    }

    /**
     * 根据环境重新隔离topic
     * @param topic 原始topic
     */
    private String reBuildTopic(String topic) {
        if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
            return topic +"_" + rocketEnhanceProperties.getEnvironment();
        }
        return topic;
    }

    /**
     * 发送同步消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(buildDestination(topic,tag), message);
    }


    public <T extends BaseMessage> SendResult send(String destination, T message) {
        // 设置业务键,此处根据公共的参数进行处理
        // 更多的其它基础业务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }

    /**
     * 发送延迟消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(buildDestination(topic,tag), message, delayLevel);
    }

    public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }
}

这里封装了一个消息发送类,实现了日志记录以及自动重建topic的功能(即生产者实现环境隔离),后面项目中只需要注入RocketMQEnhanceTemplate来实现消息的发送。

3.2.3 消费者的封装
@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
    /**
     * 默认重试次数
     */
    private static final int MAX_RETRY_TIMES = 3;

    /**
     * 延时等级
     */
    private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;


    @Resource
    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

    /**
     * 消息处理
     *
     * @param message 待处理消息
     * @throws Exception 消费异常
     */
    protected abstract void handleMessage(T message) throws Exception;

    /**
     * 超过重试次数消息,需要启用isRetry
     *
     * @param message 待处理消息
     */
    protected abstract void handleMaxRetriesExceeded(T message);


    /**
     * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    protected boolean filter(T message) {
        return false;
    }

    /**
     * 是否异常时重复发送
     *
     * @return true: 消息重试,false:不重试
     */
    protected abstract boolean isRetry();

    /**
     * 消费异常时是否抛出异常
     * 返回true,则由rocketmq机制自动重试
     * false:消费异常(如果没有开启重试则消息会被自动ack)
     */
    protected abstract boolean throwException();

    /**
     * 最大重试次数
     *
     * @return 最大重试次数,默认5次
     */
    protected int getMaxRetryTimes() {
        return MAX_RETRY_TIMES;
    }

    /**
     * isRetry开启时,重新入队延迟时间
     * @return -1:立即入队重试
     */
    protected int getDelayLevel() {
        return DELAY_LEVEL;
    }

    /**
     * 使用模板模式构建消息消费框架,可自由扩展或删减
     */
    public void dispatchMessage(T message) {
        // 基础日志记录被父类处理了
        log.info("消费者收到消息[{}]", JSONObject.toJSON(message));

        if (filter(message)) {
            log.info("消息id{}不满足消费条件,已过滤。",message.getKey());
            return;
        }
        // 超过最大重试次数时调用子类方法处理
        if (message.getRetryTimes() > getMaxRetryTimes()) {
            handleMaxRetriesExceeded(message);
            return;
        }
        try {
            long now = System.currentTimeMillis();
            handleMessage(message);
            long costTime = System.currentTimeMillis() - now;
            log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime);
        } catch (Exception e) {
            log.error("消息{}消费异常", message.getKey(),e);
            // 是捕获异常还是抛出,由子类决定
            if (throwException()) {
                //抛出异常,由DefaultMessageListenerConcurrently类处理
                throw new RuntimeException(e);
            }
            //此时如果不开启重试机制,则默认ACK了
            if (isRetry()) {
                handleRetry(message);
            }
        }
    }

    protected void handleRetry(T message) {
        // 获取子类RocketMQMessageListener注解拿到topic和tag
        RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
        if (annotation == null) {
            return;
        }
        //重新构建消息体
        String messageSource = message.getSource();
        if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
            message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
        }
        message.setRetryTimes(message.getRetryTimes() + 1);

        SendResult sendResult;

        try {
            // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
            sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
        } catch (Exception ex) {
            // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
            //由生产者直接发送
            throw new RuntimeException(ex);
        }
        // 发送失败的处理就是不进行ACK,由RocketMQ重试
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("重试消息发送失败");
        }

    }
}

使用模版设计模式定义了消息消费的骨架,实现了日志打印,异常处理,异常重试等公共逻辑,消息过滤(查重)、业务处理则交由子类实现。

3.2.4 基础配置类
@Configuration
@EnableConfigurationProperties(RocketEnhanceProperties.class)
public class RocketMQEnhanceAutoConfiguration {

    /**
     * 注入增强的RocketMQEnhanceTemplate
     */
    @Bean
    public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){
        return new RocketMQEnhanceTemplate(rocketMQTemplate);
    }

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }


    /**
     * 环境隔离配置
     */
    @Bean
    @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true")
    public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
        return new EnvironmentIsolationConfig(rocketEnhanceProperties);
    }

}
public class EnvironmentIsolationConfig implements BeanPostProcessor {
    private RocketEnhanceProperties rocketEnhanceProperties;

    public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {
        this.rocketEnhanceProperties = rocketEnhanceProperties;
    }


    /**
     * 在装载Bean之前实现参数修改
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DefaultRocketMQListenerContainer){

            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;

            if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
                container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
            }
            return container;
        }
        return bean;
    }
}
@ConfigurationProperties(prefix = "rocketmq.enhance")
@Data
public class RocketEnhanceProperties {

    private boolean enabledIsolation;

    private String environment;
}

3.3 封装后的使用

3.3.1 引入依赖
 <dependency>
   <groupId>com.jianzh6</groupId>
   <artifactId>cloud-rocket-starter</artifactId>
</dependency>
3.3.2 自定义配置
rocketmq:
	...
	enhance:
		# 启动隔离,用于激活配置类EnvironmentIsolationConfig
  	# 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果
  	enabledIsolation: true
    # 隔离环境名称,拼接到topic后,topic_dev,默认空字符串
    environment: dev
3.3.3 发送消息
@RestController
@RequestMapping("enhance")
@Slf4j
public class EnhanceProduceController {

    //注入增强后的模板,可以自动实现环境隔离,日志记录
    @Setter(onMethod_ = @Autowired)
    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

    private static final String topic = "rocket_enhance";
    private static final String tag = "member";

    /**
     * 发送实体消息
     */
    @GetMapping("/member")
    public SendResult member() {
        String key = UUID.randomUUID().toString();
        MemberMessage message = new MemberMessage();
        // 设置业务key
        message.setKey(key);
        // 设置消息来源,便于查询
        message.setSource("MEMBER");
        // 业务消息内容
        message.setUserName("Java日知录");
        message.setBirthday(LocalDate.now());

        return rocketMQEnhanceTemplate.send(topic, tag, message);
    }
}

注意这里使用的是封装后的模板工具类,一旦在配置文件中启动环境隔离,则生产者的消息也自动发送到隔离后的topic中。

3.3.4 消费者
@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "enhance_consumer_group",
        topic = "rocket_enhance",
        selectorExpression = "*",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> {

    @Override
    protected void handleMessage(MemberMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        System.out.println("业务消息处理:"+message.getUserName());
    }

    @Override
    protected void handleMaxRetriesExceeded(MemberMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }


    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }

    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
  
    @Override
    protected boolean filter() {
        // 消息过滤
        return false;
    }

    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(MemberMessage memberMessage) {
        super.dispatchMessage(memberMessage);
    }
}

为了方便消费者对RocketMQ中的消息进行处理,我们可以使用EnhanceMessageHandler来进行消息的处理和逻辑的处理。

消費者實作了RocketMQListener的同時,可以繼承EnhanceMessageHandler來進行公共邏輯的處理,而核心業務邏輯需要自己實作handleMessage方法。如果需要對訊息進行過濾或去重的處理,則可以重寫父類別的filter方法進行實作。這樣可以更方便地對訊息進行處理,減輕開發者的工作量。

以上是SpringBoot整合RocketMQ的方法是什麼的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:亿速云。如有侵權,請聯絡admin@php.cn刪除
為什麼Java是開發跨平台桌面應用程序的流行選擇?為什麼Java是開發跨平台桌面應用程序的流行選擇?Apr 25, 2025 am 12:23 AM

javaispopularforcross-platformdesktopapplicationsduetoits“ writeonce,runany where”哲學。 1)itusesbytiesebyTecodeThatrunsonAnyJvm-備用Platform.2)librarieslikeslikeslikeswingingandjavafxhelpcreatenative-lookingenative-lookinguisis.3)

討論可能需要在Java中編寫平台特定代碼的情況。討論可能需要在Java中編寫平台特定代碼的情況。Apr 25, 2025 am 12:22 AM

在Java中編寫平台特定代碼的原因包括訪問特定操作系統功能、與特定硬件交互和優化性能。 1)使用JNA或JNI訪問Windows註冊表;2)通過JNI與Linux特定硬件驅動程序交互;3)通過JNI使用Metal優化macOS上的遊戲性能。儘管如此,編寫平台特定代碼會影響代碼的可移植性、增加複雜性、可能帶來性能開銷和安全風險。

與平台獨立性相關的Java開發的未來趨勢是什麼?與平台獨立性相關的Java開發的未來趨勢是什麼?Apr 25, 2025 am 12:12 AM

Java將通過雲原生應用、多平台部署和跨語言互操作進一步提昇平台獨立性。 1)雲原生應用將使用GraalVM和Quarkus提升啟動速度。 2)Java將擴展到嵌入式設備、移動設備和量子計算機。 3)通過GraalVM,Java將與Python、JavaScript等語言無縫集成,增強跨語言互操作性。

Java的強鍵入如何有助於平台獨立性?Java的強鍵入如何有助於平台獨立性?Apr 25, 2025 am 12:11 AM

Java的強類型系統通過類型安全、統一的類型轉換和多態性確保了平台獨立性。 1)類型安全在編譯時進行類型檢查,避免運行時錯誤;2)統一的類型轉換規則在所有平台上一致;3)多態性和接口機制使代碼在不同平台上行為一致。

說明Java本機界面(JNI)如何損害平台獨立性。說明Java本機界面(JNI)如何損害平台獨立性。Apr 25, 2025 am 12:07 AM

JNI會破壞Java的平台獨立性。 1)JNI需要特定平台的本地庫,2)本地代碼需在目標平台編譯和鏈接,3)不同版本的操作系統或JVM可能需要不同的本地庫版本,4)本地代碼可能引入安全漏洞或導致程序崩潰。

是否有任何威脅或增強Java平台獨立性的新興技術?是否有任何威脅或增強Java平台獨立性的新興技術?Apr 24, 2025 am 12:11 AM

新興技術對Java的平台獨立性既有威脅也有增強。 1)雲計算和容器化技術如Docker增強了Java的平台獨立性,但需要優化以適應不同雲環境。 2)WebAssembly通過GraalVM編譯Java代碼,擴展了其平台獨立性,但需與其他語言競爭性能。

JVM的實現是什麼,它們都提供了相同的平台獨立性?JVM的實現是什麼,它們都提供了相同的平台獨立性?Apr 24, 2025 am 12:10 AM

不同JVM實現都能提供平台獨立性,但表現略有不同。 1.OracleHotSpot和OpenJDKJVM在平台獨立性上表現相似,但OpenJDK可能需額外配置。 2.IBMJ9JVM在特定操作系統上表現優化。 3.GraalVM支持多語言,需額外配置。 4.AzulZingJVM需特定平台調整。

平台獨立性如何降低發展成本和時間?平台獨立性如何降低發展成本和時間?Apr 24, 2025 am 12:08 AM

平台獨立性通過在多種操作系統上運行同一套代碼,降低開發成本和縮短開發時間。具體表現為:1.減少開發時間,只需維護一套代碼;2.降低維護成本,統一測試流程;3.快速迭代和團隊協作,簡化部署過程。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

Dreamweaver Mac版

Dreamweaver Mac版

視覺化網頁開發工具

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具