Bei der Implementierung des RocketMQ-Verbrauchs wird im Allgemeinen die Annotation @RocketMQMessageListener zum Definieren von Gruppe, Thema und Selektorausdruck (Datenfilter- und Auswahlregeln) verwendet. Verwenden Sie im Allgemeinen Ausdrücke und wechseln Sie dann dynamisch über Apollo oder Cloud Config.
<!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("消费到的数据为:"+s); } }
RocketMQMessageListener Der Standard-SelektorAusdruck des gesamten Notation ist *, Das bedeutet, dass wir alle Daten unter dem aktuellen Thema empfangen möchten, wenn wir den Ausdruck ${rocketmq.selectorExpression} verwenden. Wenn wir den Quellcode (ListenerContainerConfiguration.java) verfolgen, wurde dieser gefunden Wird beim Erstellen des Listeners verwendet, nachdem die entsprechenden Daten über die Umgebungsvariable abgerufen wurden, wodurch die gesamte Filterbedingung in einen Ausdruck geändert wurde.
@Override public void afterSingletonsInstantiated() { // 获取所有所有使用了RocketMQMessageListener注解的bean Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { // 循环注册容器 beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); // 校验当前bean是否实现了RocketMQListener接口 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } // 获取bean上的annotation RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); // 解析group及topic,可支持表达式 String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; // 注册bean的,调用createRocketMQListenerContainer genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); // 此处已经根据表达式将数据取出 String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); // 此处将SelectorExpression的数据覆盖成了表达式 container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener)bean); container.setObjectMapper(objectMapper); container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; }
Da die ListenerContainerConfiguration-Klasse die afterSingletonsInstantiated-Methode der SmartInitializingSingleton-Schnittstelle implementiert, können wir die SelectorExpression-Daten durch Reflektion analysieren und vor der Initialisierung der ListenerContainerConfiguration wieder zuweisen.
/** * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据 **/ @Configuration public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Autowired private StandardEnvironment environment; @Override public void afterPropertiesSet() throws Exception { Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); for (Object bean : beans.values()){ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { continue; } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation); Field field = invocationHandler.getClass().getDeclaredField("memberValues"); field.setAccessible(true); Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler); for (Map.Entry<String,Object> entry: memberValues.entrySet()) { if(Objects.nonNull(entry)){ memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue()))); } } } } }
Außer beim ersten Mal wurde dieser Fehler in der Version 2.1.0 des Abhängigkeitspakets behoben. Es wird empfohlen, das Versionspaket 2.1.0 oder höher zu verwenden, ohne dass es zu Abhängigkeitskonflikten kommt.
Das obige ist der detaillierte Inhalt vonSo lösen Sie die Fallstricke, die bei der Integration von RocketMQ mit SpringBoot auftreten. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!