Apabila melaksanakan penggunaan RocketMQ, anotasi @RocketMQMessageListener biasanya digunakan untuk mentakrifkan Kumpulan, Topik dan SelectorExpression (peraturan penapisan dan pemilihan data untuk menyokong penapisan data yang dinamik, ungkapan secara amnya). used , dan kemudian beralih secara dinamik melalui apollo atau konfigurasi awan.
<!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("消费到的数据为:"+s); } }
Ekspresi pemilih lalai bagi keseluruhan anotasi RocketMQMessageListener ialah *, yang bermaksud semua data menerima Topik semasa. Jika kami ingin mengkonfigurasi teg secara dinamik, kami akan mendapati bahawa semua data ditapis apabila menggunakan ungkapan ${rocketmq.selectorExpression} Menjejaki kod sumber (ListenerContainerConfiguration.java) mendapati bahawa data selectorExpression berada dalam pembolehubah persekitaran. apabila mencipta pendengar Selepas memperoleh data yang sepadan, ia telah ditimpa, menyebabkan keseluruhan keadaan penapis ditukar kepada ungkapan.
@Override public void afterSingletonsInstantiated() { // 获取所有所有使用了RocketMQMessageListener注解的bean Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { // 循环注册容器 beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); // 校验当前bean是否实现了RocketMQListener接口 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } // 获取bean上的annotation RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); // 解析group及topic,可支持表达式 String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; // 注册bean的,调用createRocketMQListenerContainer genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); // 此处已经根据表达式将数据取出 String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); // 此处将SelectorExpression的数据覆盖成了表达式 container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener)bean); container.setObjectMapper(objectMapper); container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; }
Oleh kerana kelas ListenerContainerConfiguration melaksanakan kaedah afterSingletonsInstantiated antara muka SmartInitializingSingleton, kami boleh menghuraikan data selectorExpression melalui refleksi dan menetapkannya semula sebelum memulakan ListenerContainerContainer.
/** * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据 **/ @Configuration public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Autowired private StandardEnvironment environment; @Override public void afterPropertiesSet() throws Exception { Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); for (Object bean : beans.values()){ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { continue; } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation); Field field = invocationHandler.getClass().getDeclaredField("memberValues"); field.setAccessible(true); Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler); for (Map.Entry<String,Object> entry: memberValues.entrySet()) { if(Objects.nonNull(entry)){ memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue()))); } } } } }
Kecuali buat kali pertama, pepijat ini telah dibetulkan dalam versi 2.1.0 pakej pergantungan. Adalah disyorkan untuk menggunakan pakej versi 2.1.0 atau lebih tinggi tanpa menyebabkan konflik pergantungan.
Atas ialah kandungan terperinci Bagaimana untuk menyelesaikan masalah yang dihadapi semasa menyepadukan RocketMQ dengan SpringBoot. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!