>  기사  >  Java  >  RocketMQ를 SpringBoot와 통합할 때 직면하는 함정을 해결하는 방법

RocketMQ를 SpringBoot와 통합할 때 직면하는 함정을 해결하는 방법

WBOY
WBOY앞으로
2023-05-19 11:25:231724검색

애플리케이션 시나리오

RocketMQ 소비를 구현할 때 @RocketMQMessageListener 주석은 일반적으로 Group, Topic 및 selectorExpression(데이터 필터링 및 선택 규칙)을 정의하는 데 사용됩니다. 데이터의 동적 필터링을 지원하기 위해 일반적으로 표현식이 사용되며 Apollo를 통해 사용됩니다. 또는 동적 전환을 위한 클라우드 구성.

종속성 소개

 <!-- RocketMq Spring Boot Starter-->
 <dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.4</version>
  </dependency>

소비자 코드

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费到的数据为:"+s);
    }
}

Troubleshooting

RocketMQMessageListener 전체 주석의 기본 selectorExpression은 *입니다. 이는 현재 주제 아래의 모든 데이터를 수신한다는 의미입니다. 태그를 동적으로 구성하려면 ${rocketmq.selectorExpression을 사용하세요. } 표현식을 사용하면 모든 데이터가 필터링된 것을 확인할 수 있습니다. 소스 코드(ListenerContainerConfiguration.java)를 추적하면 리스너 생성 시 환경 변수에서 해당 데이터를 가져온 후 selectorExpression의 데이터가 덮어쓰기되어 전체 필터링이 발생하는 것으로 나타났습니다. 조건이 바뀌는 표현입니다.

@Override
    public void afterSingletonsInstantiated() {
    	// 获取所有所有使用了RocketMQMessageListener注解的bean
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        if (Objects.nonNull(beans)) {
        	// 循环注册容器
            beans.forEach(this::registerContainer);
        }
    }
    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
		// 校验当前bean是否实现了RocketMQListener接口
        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
        }
		// 获取bean上的annotation
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
		// 解析group及topic,可支持表达式
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
        boolean listenerEnabled =
            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);
        if (!listenerEnabled) {
            log.debug(
                "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                consumerGroup, topic);
            return;
        }
        validate(annotation);
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
		// 注册bean的,调用createRocketMQListenerContainer
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }
        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }
    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
        RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();        
        container.setRocketMQMessageListener(annotation);        
        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        // 此处已经根据表达式将数据取出
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (!StringUtils.isEmpty(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        // 此处将SelectorExpression的数据覆盖成了表达式
        container.setRocketMQMessageListener(annotation);
        container.setRocketMQListener((RocketMQListener)bean);
        container.setObjectMapper(objectMapper);
        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        container.setName(name);  // REVIEW ME, use the same clientId or multiple?
        return container;
    }

문제 해결

ListenerContainerConfiguration 클래스는 SmartInitializingSingleton 인터페이스의 afterSingletonsInstantiated 메서드를 구현하기 때문에 ListenerContainerConfiguration을 초기화하기 전에 리플렉션을 통해 selectorExpression 데이터를 구문 분석하고 다시 할당할 수 있습니다.

/**
 * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private StandardEnvironment environment;
    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        for (Object bean : beans.values()){
            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                continue;
            }
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
            InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
            Field field = invocationHandler.getClass().getDeclaredField("memberValues");
            field.setAccessible(true);
            Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
            for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
                if(Objects.nonNull(entry)){
                    memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
                }
            }
        }
    }
}

처음 제외하고는 종속성 패키지 2.1.0 버전에서 이 버그가 수정되었습니다. 종속성 충돌을 일으키지 않는다는 전제하에 버전 2.1.0 이상을 사용하는 것이 좋습니다.

위 내용은 RocketMQ를 SpringBoot와 통합할 때 직면하는 함정을 해결하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제