springboot menyepadukan mqtt
Apabila menyediakan, jika anda menggunakan kluster, ingat untuk membuka port berikut:
Baiklah, Selepas pembinaan berjaya, langkah seterusnya ialah menyambungkan program java kami ke mqtt Terdapat dua cara (sebenarnya lebih daripada dua) untuk menyambung.
Salah satunya ialah menggunakan perpustakaan klien Java MQTT secara langsung
Kedua Adalah disyorkan untuk menggunakan, yang menjadi tumpuan kami spring integration mqtt
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.14</version> </dependency>Langkah kedua ialah menambah konfigurasi<.>
1 Mula-mula Tulis beberapa konfigurasi asas
mqtt: username: test # 账号 password: 123456 # 密码 host-url: tcp://127.0.0.1:1883 # mqtt连接tcp地址 in-client-id: ${random.value} # 随机值,使出入站 client ID 不同 out-client-id: ${random.value} client-id: ${random.int} # 客户端Id,不能相同,采用随机数 ${random.value} default-topic: test/#,topic/+/+/up # 默认主题 timeout: 60 # 超时时间 keepalive: 60 # 保持连接 clearSession: true # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
2. Kemudian tulis kelas yang sepadan
import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * MqttProperties * * @author hengzi * @date 2022/8/23 */ @Component public class MqttProperties { /** * 用户名 */ @Value("${mqtt.username}") private String username; /** * 密码 */ @Value("${mqtt.password}") private String password; /** * 连接地址 */ @Value("${mqtt.host-url}") private String hostUrl; /** * 进-客户Id */ @Value("${mqtt.in-client-id}") private String inClientId; /** * 出-客户Id */ @Value("${mqtt.out-client-id}") private String outClientId; /** * 客户Id */ @Value("${mqtt.client-id}") private String clientId; /** * 默认连接话题 */ @Value("${mqtt.default-topic}") private String defaultTopic; /** * 超时时间 */ @Value("${mqtt.timeout}") private int timeout; /** * 保持连接数 */ @Value("${mqtt.keepalive}") private int keepalive; /**是否清除session*/ @Value("${mqtt.clearSession}") private boolean clearSession; // ...getter and setter }
MqttProperties
Langkah seterusnya ialah mengkonfigurasi beberapa perkara yang berkonsepkan di sini seperti paip, penyesuai , masuk channel
, keluar adapter
, dsb., dsb., nampak sangat sakit kepala Inbound
Outbound
Baiklah, mari kita buat satu persatu ,
Pertama sekali, menyambung ke mqtt memerlukan pelanggan, kemudian kami akan membuka kilang pelanggan, yang boleh menjana ramai, ramai pelanggan
@Bean public MqttPahoClientFactory mqttPahoClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getHostUrl().split(",")); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; }
dan kemudian membina dua paip (
) , satu keluar, Satu masuk//出站消息管道, @Bean public MessageChannel mqttOutboundChannel(){ return new DirectChannel(); } // 入站消息管道 @Bean public MessageChannel mqttInboundChannel(){ return new DirectChannel(); }
channel
Untuk membolehkan tiub ini mengalir, penyesuai () // Mqtt 管道适配器 @Bean public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){ return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(",")); }
adapter
diperlukan dan kemudian pengeluar mesej ditakrifkan // 消息生产者 @Bean public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){ adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //入站投递的通道 adapter.setOutputChannel(mqttInboundChannel()); adapter.setQos(1); return adapter; }
Kemudian kami menerima mesej Di mana untuk mengendalikannya? Jawapannya ada di sini:
@Bean //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行 @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler handleMessage() { // 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面) return mqttMessageHandle; // 你也可以这样写 // return new MessageHandler() { // @Override // public void handleMessage(Message<?> message) throws MessagingException { // // do something // } // };
Pada ketika ini kami sebenarnya boleh menerima mesej daripada mqtt
Seterusnya, konfigurasikan menghantar mesej ke mqtt
Konfigurasikan Pemproses keluar
// 出站处理器 @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory factory){ MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory); handler.setAsync(true); handler.setConverter(new DefaultPahoMessageConverter()); handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]); return handler; }
Pada pendapat saya, pemproses keluar ini dikendalikan oleh orang lain (
), jadi saya tidak akan mengendalikannya sahaja bagaimana untuk menghantarnya, Untuk melengkapkanMqttPahoMessageHandler
MqttPahoMessageHandler
Seterusnya kami menentukan antara muka
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * MqttGateway * * @author hengzi * @date 2022/8/23 */ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data); }
Kami boleh terus menghubungi antara muka ini untuk menghantar data ke mqtt
Setakat ini, keseluruhan fail konfigurasi kelihatan seperti ini:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * MqttConfig * * @author hengzi * @date 2022/8/23 */ @Configuration public class MqttConfig { /** * 以下属性将在配置文件中读取 **/ @Autowired private MqttProperties mqttProperties; //Mqtt 客户端工厂 @Bean public MqttPahoClientFactory mqttPahoClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getHostUrl().split(",")); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } // Mqtt 管道适配器 @Bean public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){ return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(",")); } // 消息生产者 @Bean public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){ adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //入站投递的通道 adapter.setOutputChannel(mqttInboundChannel()); adapter.setQos(1); return adapter; } // 出站处理器 @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory factory){ MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory); handler.setAsync(true); handler.setConverter(new DefaultPahoMessageConverter()); handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]); return handler; } @Bean //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行 @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler handleMessage() { return mqttMessageHandle; } //出站消息管道, @Bean public MessageChannel mqttOutboundChannel(){ return new DirectChannel(); } // 入站消息管道 @Bean public MessageChannel mqttInboundChannel(){ return new DirectChannel(); } }
Memproses mesej
@Component public class MqttMessageHandle implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { } }
MqttMessageHandle
Setelah memahami lebih lanjut, saya dapati kawasan yang boleh dioptimumkan Contohnya, terdapat banyak jenis saluran digunakan di sini ialah Saluran mesej lalai, yang menghantar mesej kepada pelanggan dan kemudian menyekat penghantaran sehingga mesej diterima Semua kaedah penghantaran adalah segerak dan dijalankan oleh urutan.DirectChannel
Spring Integration
Di sini kami. Anda boleh menukar
channel
ExecutorChannel
@Bean public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 最大可创建的线程数 int maxPoolSize = 200; executor.setMaxPoolSize(maxPoolSize); // 核心线程池大小 int corePoolSize = 50; executor.setCorePoolSize(corePoolSize); // 队列最大长度 int queueCapacity = 1000; executor.setQueueCapacity(queueCapacity); // 线程池维护线程所允许的空闲时间 int keepAliveSeconds = 300; executor.setKeepAliveSeconds(keepAliveSeconds); // 线程池对拒绝任务(无线程可用)的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } // 入站消息管道 @Bean public MessageChannel mqttInboundChannel(){ // 用线程池 return new ExecutorChannel(mqttThreadPoolTaskExecutor()); }
channel
yang boleh menggunakan berbilang benang Ia sebenarnya boleh dijalankan di sini
Tetapi konfigurasi ini masih banyak , sedikit Ia mengelirukan, jadi saya mencari tapak web rasmi dan menemui kaedah konfigurasi yang lebih mudah dipanggil
Java DSL
Kami merujuk kepada tapak web rasmi, mengubahnya sedikit dan mengkonfigurasinya menggunakan DSL:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * MqttConfigV2 * * @author hengzi * @date 2022/8/24 */ @Configuration public class MqttConfigV2 { @Autowired private MqttProperties mqttProperties; @Autowired private MqttMessageHandle mqttMessageHandle; //Mqtt 客户端工厂 所有客户端从这里产生 @Bean public MqttPahoClientFactory mqttPahoClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getHostUrl().split(",")); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } // Mqtt 管道适配器 @Bean public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){ return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(",")); } // 消息生产者 (接收,处理来自mqtt的消息) @Bean public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) { adapter.setCompletionTimeout(5000); adapter.setQos(1); return IntegrationFlows.from( adapter) .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor())) .handle(mqttMessageHandle) .get(); } @Bean public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 最大可创建的线程数 int maxPoolSize = 200; executor.setMaxPoolSize(maxPoolSize); // 核心线程池大小 int corePoolSize = 50; executor.setCorePoolSize(corePoolSize); // 队列最大长度 int queueCapacity = 1000; executor.setQueueCapacity(queueCapacity); // 线程池维护线程所允许的空闲时间 int keepAliveSeconds = 300; executor.setKeepAliveSeconds(keepAliveSeconds); // 线程池对拒绝任务(无线程可用)的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } // 出站处理器 (向 mqtt 发送消息) @Bean public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory); handler.setAsync(true); handler.setConverter(new DefaultPahoMessageConverter()); handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]); return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get(); } }
Lihat seperti ini Ia benar-benar lebih mudah, dan kepala saya tidak begitu besar saya harap saya tahu lebih awal.
Baiklah, perkara di atas adalah berkaitan dengan konfigurasi mqtt sebenarnya telah selesai.
Tetapi sebenarnya, saya sentiasa mempunyai idea, iaitu mesej yang kami terima semuanya dilaksanakan dalam kaedah
,@Override public void handleMessage(Message<?> message) throws MessagingException { }
handleMessage
Jadi saya telah idea, bolehkah ia berdasarkan apa yang saya langgan Topik dilaksanakan dalam kaedah yang berbeza Untuk masalah ini, anda sebenarnya boleh menggunakan untuk mencapainya, tetapi jelas sekali, jika saya melanggan banyak topik, ia akan menjadi. sakit kepala untuk menulis juga tahu sama ada ini boleh dicapai, jadi saya tidak akan membincangkannya di sini if ... else ...
Yang kedua ialah, saya tidak tahu bagaimana untuk menukar nama saya merujuk kepada reka bentuk
Spring Integration
, kami menambah Ini bermakna kelas ini ialah kelas perkhidmatan yang pakar dalam memproses mesej mqttrouter
Pada masa yang sama, menambah channel
kepada kaedah kelas ini bermakna topik ini dikendalikan oleh kaedah ini.
OK, teorinya ada, Langkah seterusnya ialah latihan spring
@Controller
Mula-mula tentukan dua anotasi
import org.springframework.core.annotation.AliasFor; import org.springframework.stereotype.Component; import java.lang.annotation.*; @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface MqttService { @AliasFor( annotation = Component.class ) String value() default ""; }
ditambah dengan anotasi @Controller
, spring akan mengimbasnya dan mendaftarkannya dalam bekas IOC @RequestMapping
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface MqttTopic { /** * 主题名字 */ String value() default ""; }Rujukan
Kita harus menggunakannya seperti ini:@MqttService
rreee
OK The langkah seterusnya ialah melaksanakan penggunaan sedemikian@MqttTopic
anotasi, kemudian melintasi kelas ini, cari kaedah @Component
, dan kemudian bandingkan nilai anotasi @RequestMapping
dengan topik yang diterima. >Berhenti bercakap kosong, mari kita ke kod
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; /** * MqttTopicHandle * * @author hengzi * @date 2022/8/24 */ @MqttService public class MqttTopicHandle { public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class); // 这里的 # 号是通配符 @MqttTopic("test/#") public void test(Message<?> message){ log.info("test="+message.getPayload()); } // 这里的 + 号是通配符 @MqttTopic("topic/+/+/up") public void up(Message<?> message){ log.info("up="+message.getPayload()); } // 注意 你必须先订阅 @MqttTopic("topic/1/2/down") public void down(Message<?> message){ log.info("down="+message.getPayload()); } }
Kelas alat
import org.springframework.aop.framework.AopContext; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Map; /** * spring工具类 方便在非spring管理环境中获取bean * */ @Component public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware { /** Spring应用上下文环境 */ private static ConfigurableListableBeanFactory beanFactory; private static ApplicationContext applicationContext; public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{ return beanFactory.getBeansWithAnnotation(clsName); } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringUtils.applicationContext = applicationContext; } /** * 获取对象 * * @param name * @return Object 一个以所给名字注册的bean的实例 * @throws org.springframework.beans.BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 获取类型为requiredType的对象 * * @param clz * @return * @throws org.springframework.beans.BeansException * */ public static <T> T getBean(Class<T> clz) throws BeansException { T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 注册对象的类型 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果给定的bean名字在bean定义中有别名,则返回这些别名 * * @param name * @return * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } /** * 获取aop代理对象 * * @param invoker * @return */ @SuppressWarnings("unchecked") public static <T> T getAopProxy(T invoker) { return (T) AopContext.currentProxy(); } /** * 获取当前的环境配置,无配置返回null * * @return 当前的环境配置 */ public static String[] getActiveProfiles() { return applicationContext.getEnvironment().getActiveProfiles(); } }
OK, 大功告成. 终于舒服了, 终于不用写if...else...
了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~
以上!
参考文章:
使用 Spring integration 在Springboot中集成Mqtt
Spring Integration(一)概述
附:
动态添加主题方式:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.stereotype.Service; import java.util.Arrays; /** * MqttService * * @author hengzi * @date 2022/8/25 */ @Service public class MqttService { @Autowired private MqttPahoMessageDrivenChannelAdapter adapter; public void addTopic(String topic) { addTopic(topic, 1); } public void addTopic(String topic,int qos) { String[] topics = adapter.getTopic(); if(!Arrays.asList(topics).contains(topic)){ adapter.addTopic(topic,qos); } } public void removeTopic(String topic) { adapter.removeTopic(topic); } }
直接调用就行
Atas ialah kandungan terperinci Bagaimana springboot menyepadukan mqtt. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!