Maison  >  Article  >  Java  >  Comment résoudre les pièges rencontrés lors de l'intégration de RocketMQ avec SpringBoot

Comment résoudre les pièges rencontrés lors de l'intégration de RocketMQ avec SpringBoot

WBOY
WBOYavant
2023-05-19 11:25:231725parcourir

Scénarios d'application

Lors de la mise en œuvre de la consommation RocketMQ, l'annotation @RocketMQMessageListener est généralement utilisée pour définir Group, Topic et selectorExpression (filtrage des données et règles de sélection). Afin de prendre en charge le filtrage dynamique des données, les expressions sont généralement utilisées, puis via Apollo). ou configuration cloud pour une commutation dynamique.

Présentation des dépendances

 <!-- RocketMq Spring Boot Starter-->
 <dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.4</version>
  </dependency>

Code consommateur

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费到的数据为:"+s);
    }
}

Dépannage

RocketMQMessageListener L'expression de sélection par défaut de l'annotation entière est *, ce qui signifie recevoir toutes les données du sujet actuel. Si nous voulons configurer dynamiquement les balises, utilisez ${rocketmq.selectorExpression. } expression, vous constaterez que toutes les données ont été filtrées. Le suivi du code source (ListenerContainerConfiguration.java) a révélé que lors de la création de l'écouteur, les données de selectorExpression ont été écrasées après avoir obtenu les données correspondantes de la variable d'environnement d'environnement, provoquant le filtrage complet. la condition à changer est une expression.

@Override
    public void afterSingletonsInstantiated() {
    	// 获取所有所有使用了RocketMQMessageListener注解的bean
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        if (Objects.nonNull(beans)) {
        	// 循环注册容器
            beans.forEach(this::registerContainer);
        }
    }
    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
		// 校验当前bean是否实现了RocketMQListener接口
        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
        }
		// 获取bean上的annotation
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
		// 解析group及topic,可支持表达式
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
        boolean listenerEnabled =
            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);
        if (!listenerEnabled) {
            log.debug(
                "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                consumerGroup, topic);
            return;
        }
        validate(annotation);
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
		// 注册bean的,调用createRocketMQListenerContainer
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }
        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }
    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
        RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();        
        container.setRocketMQMessageListener(annotation);        
        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        // 此处已经根据表达式将数据取出
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (!StringUtils.isEmpty(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        // 此处将SelectorExpression的数据覆盖成了表达式
        container.setRocketMQMessageListener(annotation);
        container.setRocketMQListener((RocketMQListener)bean);
        container.setObjectMapper(objectMapper);
        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        container.setName(name);  // REVIEW ME, use the same clientId or multiple?
        return container;
    }

Problème résolu

Étant donné que la classe ListenerContainerConfiguration implémente la méthode afterSingletonsInstantiated de l'interface SmartInitializingSingleton, nous pouvons analyser les données selectorExpression par réflexion et les réaffecter avant d'initialiser ListenerContainerConfiguration.

/**
 * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private StandardEnvironment environment;
    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        for (Object bean : beans.values()){
            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                continue;
            }
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
            InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
            Field field = invocationHandler.getClass().getDeclaredField("memberValues");
            field.setAccessible(true);
            Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
            for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
                if(Objects.nonNull(entry)){
                    memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
                }
            }
        }
    }
}

Sauf pour la première fois, ce bug a été corrigé dans la version 2.1.0 du package de dépendances. Sous prétexte de ne pas provoquer de conflits de dépendances, il est recommandé d'utiliser la version 2.1.0 ou supérieure.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer