Heim >Java >javaLernprogramm >Wie SpringBoot das Senden und Empfangen von MQTT-Nachrichten implementiert
Für Herausgeber:
1. Die Nachricht wird über das Nachrichten-Gateway gesendet und die Instanz von MessageChannel
DirectChannel
verarbeitet die Sendedetails. MessageChannel
的实例 DirectChannel
处理发送的细节。
2.DirectChannel
收到消息后,内部通过 MessageHandler
的实例 MqttPahoMessageHandler
发送到指定的 Topic。
对于订阅者:
1.通过注入 MessageProducerSupport
的实例 MqttPahoMessageDrivenChannelAdapter
,实现订阅 Topic 和绑定消息消费的 MessageChannel
。
2.同样由 MessageChannel
的实例 DirectChannel
处理消费细节。
Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler
DirectChannel
die Nachricht empfangen hat, wird sie intern über die Instanz MqttPahoMessageHandler
von MessageHandler
an das angegebene Thema gesendet.
MessageProducerSupport
und MqttPahoMessageDrivenChannelAdapter
, dem MessageChannel
, der Topic abonniert und die Nachricht bindet Der Verbrauch wird realisiert. 2. Die Verbrauchsdetails werden auch von der Instanz DirectChannel
von MessageChannel
verarbeitet. Die Kanalnachricht wird zur Verwendung an unsere benutzerdefinierte MqttInboundMessageHandler
-Instanz gesendet. Sie können sehen, dass der gesamte Verarbeitungsprozess im Wesentlichen der gleiche ist wie der vorherige. Spring Integration abstrahiert einen solchen Satz von Nachrichtenkommunikationsmechanismen, und die spezifischen Kommunikationsdetails werden durch die integrierte Middleware bestimmt. 1. Maven-Abhängigkeit <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <version>2.5.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.5.5</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.5</version> </dependency>
#mqtt配置 mqtt: username: 123 password: 123 #MQTT-服务器连接地址,如果有多个,用逗号隔开 url: tcp://127.0.0.1:1883 #MQTT-连接服务器默认客户端ID client: id: ${random.value} default: #MQTT-默认的消息推送主题,实际可在调用接口时指定 topic: topic,mqtt/test/# #连接超时 completionTimeout: 3000🎜5. mqtt-Sendeschnittstelle qtt-Ereignisüberwachungsklasse 🎜
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; /** * mqtt 推送and接收 消息类 **/ @Configuration @IntegrationComponentScan @Slf4j public class MqttSenderAndReceiveConfig { private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Autowired private MqttReceiveHandle mqttReceiveHandle; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String hostUrl; @Value("${mqtt.client.id}") private String clientId; @Value("${mqtt.default.topic}") private String defaultTopic; @Value("${mqtt.completionTimeout}") private int completionTimeout; //连接超时 /** * MQTT连接器选项 **/ @Bean(value = "getMqttConnectOptions") public MqttConnectOptions getMqttConnectOptions1() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession(true); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval(10); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; } /** * MQTT工厂 **/ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions1()); return factory; } /** * MQTT信息通道(生产者) **/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息处理器(生产者) **/ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调 //设置转换器 发送bytes数据 DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); return messageHandler; } /** * 配置client,监听的topic * MQTT消息订阅绑定(消费者) **/ @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics); adapter.setCompletionTimeout(completionTimeout); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); adapter.setConverter(converter); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * MQTT信息通道(消费者) **/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * MQTT消息处理器(消费者) **/ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //处理接收消息 mqttReceiveHandle.handle(message); } }; } }🎜 7. Schnittstellentest🎜
/** * mqtt客户端消息处理类 **/ @Slf4j @Component public class MqttReceiveHandle { public void handle(Message<?> message) { log.info("收到订阅消息: {}", message); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); log.info("消息主题:{}", topic); Object payLoad = message.getPayload(); byte[] data = (byte[]) payLoad; Packet packet = Packet.parse(data); log.info("发送的Packet数据{}", JSON.toJSONString(packet)); } }
Das obige ist der detaillierte Inhalt vonWie SpringBoot das Senden und Empfangen von MQTT-Nachrichten implementiert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!