For the publisher:
1. The message is sent through the message gateway, by an instance of MessageChannel
DirectChannel
handles the sending details.
2.DirectChannel
After receiving the message, it is internally sent to the specified Topic through the instance of MessageHandler
MqttPahoMessageHandler
.
For subscribers:
1. By injecting an instance of MessageProducerSupport
MqttPahoMessageDrivenChannelAdapter
, you can subscribe to Topic and bind messages Consumed MessageChannel
.
2. The consumption details are also handled by the instance DirectChannel
of MessageChannel
.
Channel message will be sent to our customized MqttInboundMessageHandler
instance for consumption.
You can see that the entire processing process is basically the same as the previous one. Spring Integration abstracts such a set of message communication mechanisms, and the specific communication details are determined by the middleware it integrates.
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <version>2.5.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.5.5</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.5</version> </dependency>
#mqtt配置 mqtt: username: 123 password: 123 #MQTT-服务器连接地址,如果有多个,用逗号隔开 url: tcp://127.0.0.1:1883 #MQTT-连接服务器默认客户端ID client: id: ${random.value} default: #MQTT-默认的消息推送主题,实际可在调用接口时指定 topic: topic,mqtt/test/# #连接超时 completionTimeout: 3000
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; /** * mqtt 推送and接收 消息类 **/ @Configuration @IntegrationComponentScan @Slf4j public class MqttSenderAndReceiveConfig { private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Autowired private MqttReceiveHandle mqttReceiveHandle; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String hostUrl; @Value("${mqtt.client.id}") private String clientId; @Value("${mqtt.default.topic}") private String defaultTopic; @Value("${mqtt.completionTimeout}") private int completionTimeout; //连接超时 /** * MQTT连接器选项 **/ @Bean(value = "getMqttConnectOptions") public MqttConnectOptions getMqttConnectOptions1() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession(true); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval(10); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; } /** * MQTT工厂 **/ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions1()); return factory; } /** * MQTT信息通道(生产者) **/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息处理器(生产者) **/ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调 //设置转换器 发送bytes数据 DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); return messageHandler; } /** * 配置client,监听的topic * MQTT消息订阅绑定(消费者) **/ @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics); adapter.setCompletionTimeout(completionTimeout); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); adapter.setConverter(converter); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * MQTT信息通道(消费者) **/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * MQTT消息处理器(消费者) **/ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //处理接收消息 mqttReceiveHandle.handle(message); } }; } }
/** * mqtt客户端消息处理类 **/ @Slf4j @Component public class MqttReceiveHandle { public void handle(Message<?> message) { log.info("收到订阅消息: {}", message); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); log.info("消息主题:{}", topic); Object payLoad = message.getPayload(); byte[] data = (byte[]) payLoad; Packet packet = Packet.parse(data); log.info("发送的Packet数据{}", JSON.toJSONString(packet)); } }
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; /** * mqtt发送消息 * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置) * **/ @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 发送信息到MQTT服务器 * * @param */ void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。 * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); }
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttListener { /** * 连接失败的事件通知 * @param mqttConnectionFailedEvent */ @EventListener(classes = MqttConnectionFailedEvent.class) public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) { log.info("连接失败的事件通知"); } /** * 已发送的事件通知 * @param mqttMessageSentEvent */ @EventListener(classes = MqttMessageSentEvent.class) public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) { log.info("已发送的事件通知"); } /** * 已传输完成的事件通知 * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执 * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知 * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知 * @param mqttMessageDeliveredEvent */ @EventListener(classes = MqttMessageDeliveredEvent.class) public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) { log.info("已传输完成的事件通知"); } /** * 消息订阅的事件通知 * @param mqttSubscribedEvent */ @EventListener(classes = MqttSubscribedEvent.class) public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) { log.info("消息订阅的事件通知"); } }
@Resource private MqttGateway mqttGateway; /** * sendData 消息 * topic 订阅主题 **/ @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST) public String sendMqtt(String sendData, String topic) { MqttMessage mqttMessage = new MqttMessage(); mqttGateway.sendToMqtt(topic, sendData); //mqttGateway.sendToMqttObject(topic, sendData.getBytes()); return "OK"; }
The above is the detailed content of How SpringBoot implements MQTT message sending and receiving. For more information, please follow other related articles on the PHP Chinese website!