Heim >Java >Behandeln Sie zugrunde liegende MQTT- und STOMP-Verbindungen mit dem Spring Integration-Adapter

Behandeln Sie zugrunde liegende MQTT- und STOMP-Verbindungen mit dem Spring Integration-Adapter

WBOY
WBOYnach vorne
2024-02-05 22:00:101286Durchsuche
Frageninhalt

Wir verfügen über einige Spring-Integrationsprozesse, um Nachrichten zu verarbeiten, die über mqtt oder stomp eingehen. Hierfür nutzen wir Adapter mqttpahomessagedrivenchanneladapterstompinboundchanneladapter. Im Fall von mqtt haben wir beobachtet, dass der Adapter die Verbindung schließt und keine Nachrichten mehr empfängt, wenn ein Endpunkt im Stream eine Ausnahme auslöst. Wenn wir den Proxy neu starten, wird die Verbindung zu ihm ebenfalls nicht wieder hergestellt.

Um Ausnahmen zu behandeln, setzen wir den Namen des Fehlerkanals auf den Wert, der standardmäßig vom Spring-Adapter „errorchannel“ verarbeitet wird. Unsere Absicht ist es, nur die Ausnahme zu protokollieren, ohne die zugrunde liegende Verbindung zu schließen. Ist dies die richtige Art, Ausnahmen während des gesamten Prozesses zu behandeln?

Was das Problem der erneuten Verbindung betrifft, haben wir für jedes Transportprotokoll unterschiedliche Methoden.

  • Für mqtt werden wir connectionoptionsautomaticreconnect 设置为 true:
var clientfactory = new defaultmqttpahoclientfactory();
clientfactory.getconnectionoptions().setautomaticreconnect(true);

var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic");
adapter.seterrorchannelname("errorchannel");
  • Für stampfen setzen wir taskscheduler 设置为 reactornettytcpstompclient in den Kontext:
var stompClient = new ReactorNettyTcpStompClient(host, port);
stompClient.setTaskScheduler(taskScheduler);

var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);

var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue");
adapter.setErrorChannelName("errorChannel");

Ist dies der beste Weg, dieses Problem zu lösen?


Richtige Antwort


Ja, errorchannel 选项是抑制向 mqtt 客户端抛出异常的好方法。不必是全局 errorchannel ,它可能在许多不同的地方使用。 setautomaticreconnect(true) wird tatsächlich für Inbound-Channel-Adapter empfohlen.

Nicht verwendet in reactornettytcpstompclienttaskscheduler 不适用于重新连接。请参阅其 javadocs。我认为重新连接逻辑在 reactornettytcpstompclient:

public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) {
    connectionhandlingstompsession session = createsession(connectheaders, handler);
    this.tcpclient.connectasync(session);
    return session.getsession();
}

Fall der Wiederverbindung über eine andere Variante:

CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);

Das obige ist der detaillierte Inhalt vonBehandeln Sie zugrunde liegende MQTT- und STOMP-Verbindungen mit dem Spring Integration-Adapter. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen