Maison  >  Article  >  Java  >  Comment SpringBoot implémente l'envoi et la réception de messages MQTT

Comment SpringBoot implémente l'envoi et la réception de messages MQTT

WBOY
WBOYavant
2023-05-12 21:31:121978parcourir

Logique d'interaction d'intégration Spring

Pour les éditeurs :

1. Le message est envoyé via la passerelle de message et l'instance de MessageChannel DirectChannel gère les détails d'envoi. MessageChannel 的实例 DirectChannel 处理发送的细节。

2.DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

1.通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel

2.同样由 MessageChannel 的实例 DirectChannel 处理消费细节。

Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler

2. Une fois que DirectChannel a reçu le message, il est envoyé en interne au sujet spécifié via l'instance MqttPahoMessageHandler de MessageHandler.

Pour les abonnés :

1. En injectant l'instance de MessageProducerSupport et de MqttPahoMessageDrivenChannelAdapter, le MessageChannel qui s'abonne au sujet et lie le message. la consommation est réalisée.

2. Les détails de consommation sont également gérés par l'instance DirectChannel de MessageChannel.

Le message de la chaîne sera envoyé à notre instance personnalisée MqttInboundMessageHandler pour consommation.

Vous pouvez voir que l'ensemble du processus de traitement est fondamentalement le même que le précédent. Spring Integration résume un tel ensemble de mécanismes de communication de messages, et les détails spécifiques de la communication sont déterminés par le middleware qu'il intègre.

1. Dépendance maven

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>2.5.1</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.5</version>
</dependency>

2. Fichier de configuration yaml 🎜
#mqtt配置
mqtt:
  username: 123
  password: 123
  #MQTT-服务器连接地址,如果有多个,用逗号隔开
  url: tcp://127.0.0.1:1883
  #MQTT-连接服务器默认客户端ID
  client:
    id: ${random.value}
  default:
    #MQTT-默认的消息推送主题,实际可在调用接口时指定
    topic: topic,mqtt/test/#
    #连接超时
  completionTimeout: 3000
🎜3. Classe de configuration du consommateur producteur mqtt 🎜
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
import java.util.Arrays;
import java.util.List;
 
/**
 * mqtt 推送and接收 消息类
 **/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {
 
    private static final byte[] WILL_DATA;
 
    static {
        WILL_DATA = "offline".getBytes();
    }
 
    @Autowired
    private MqttReceiveHandle mqttReceiveHandle;
 
    @Value("${mqtt.username}")
    private String username;
 
    @Value("${mqtt.password}")
    private String password;
 
    @Value("${mqtt.url}")
    private String hostUrl;
 
    @Value("${mqtt.client.id}")
    private String clientId;
 
    @Value("${mqtt.default.topic}")
    private String defaultTopic;
 
    @Value("${mqtt.completionTimeout}")
    private int completionTimeout;   //连接超时
 
    /**
     * MQTT连接器选项
     **/
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions1() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(10);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }
 
    /**
     * MQTT工厂
     **/
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions1());
        return factory;
    }
 
    /**
     * MQTT信息通道(生产者)
     **/
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息处理器(生产者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调
        //设置转换器 发送bytes数据
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        return messageHandler;
    }
 
    /**
     * 配置client,监听的topic
     * MQTT消息订阅绑定(消费者)
     **/
    @Bean
    public MessageProducer inbound() {
        List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
        String[] topics = new String[topicList.size()];
        topicList.toArray(topics);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
        adapter.setCompletionTimeout(completionTimeout);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
 
    /**
     * MQTT信息通道(消费者)
     **/
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息处理器(消费者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理接收消息
                mqttReceiveHandle.handle(message);
            }
        };
    }
}
🎜4. Classe de traitement des messages 🎜
/**
 * mqtt客户端消息处理类
 **/
@Slf4j
@Component
public class MqttReceiveHandle {
 
    public void handle(Message<?> message) {
        log.info("收到订阅消息: {}", message);
        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
        log.info("消息主题:{}", topic);
        Object payLoad = message.getPayload();
        byte[] data = (byte[]) payLoad;
        Packet packet = Packet.parse(data);
        log.info("发送的Packet数据{}", JSON.toJSONString(packet));
 
    }
}
🎜5. 🎜
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
 
/**
 * mqtt发送消息
 * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)
 * **/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param
     */
    void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param qos 对消息处理的几种机制。
     * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
 
    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}
🎜 7. Test d'interface🎜
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class MqttListener {
    /**
     * 连接失败的事件通知
     * @param mqttConnectionFailedEvent
     */
    @EventListener(classes = MqttConnectionFailedEvent.class)
    public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
        log.info("连接失败的事件通知");
    }
 
    /**
     * 已发送的事件通知
     * @param mqttMessageSentEvent
     */
    @EventListener(classes = MqttMessageSentEvent.class)
    public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
        log.info("已发送的事件通知");
    }
 
    /**
     * 已传输完成的事件通知
     * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
     * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
     * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
     * @param mqttMessageDeliveredEvent
     */
    @EventListener(classes = MqttMessageDeliveredEvent.class)
    public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
        log.info("已传输完成的事件通知");
    }
 
    /**
     * 消息订阅的事件通知
     * @param mqttSubscribedEvent
     */
    @EventListener(classes = MqttSubscribedEvent.class)
    public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
        log.info("消息订阅的事件通知");
    }
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer