首頁  >  文章  >  使用 Spring Integration 適配器處理底層 MQTT 和 STOMP 連接

使用 Spring Integration 適配器處理底層 MQTT 和 STOMP 連接

WBOY
WBOY轉載
2024-02-05 22:00:101216瀏覽
問題內容

我們有一些 spring integration 流程來處理透過 mqtt 或 stomp 到達的訊息。為此,我們使用適配器 mqttpahomessagedrivenchanneladapterstompinboundchanneladapter。 在 mqtt 的情況下,我們觀察到,如果流中的任何端點拋出異常,適配器將關閉連接並且不再接收訊息。同樣,如果我們重新啟動代理,則不會再次建立與其的連線。

為了處理異常問題,我們將錯誤通道名稱設定為spring預設處理的值「errorchannel」的適配器。我們的目的是只記錄異常,而不關閉底層連接。這是在整個流程中處理異常的正確方法嗎?

關於重新連接問題,我們對每種傳輸協定都有不同的方法。

  • 對於 mqtt,我們將 connectionoptionsautomaticreconnect 設定為 true
var clientfactory = new defaultmqttpahoclientfactory();
clientfactory.getconnectionoptions().setautomaticreconnect(true);

var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic");
adapter.seterrorchannelname("errorchannel");
  • 對於 stomp,我們將上下文中的 taskscheduler 設定為 reactornettytcpstompclient
var stompClient = new ReactorNettyTcpStompClient(host, port);
stompClient.setTaskScheduler(taskScheduler);

var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);

var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue");
adapter.setErrorChannelName("errorChannel");

這是處理這個問題的最佳方法嗎?


正確答案


是的,errorchannel 選項是抑制向 mqtt 用戶端拋出例外的好方法。不必是全域 errorchannel ,它可能在許多不同的地方使用。 setautomaticreconnect(true) 確實建議用於入站通道適配器。

reactornettytcpstompclienttaskscheduler 不適用於重新連線。請參閱其 javadocs。我認為重新連接邏輯在 reactornettytcpstompclient 中沒有使用:

public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) {
    connectionhandlingstompsession session = createsession(connectheaders, handler);
    this.tcpclient.connectasync(session);
    return session.getsession();
}

透過另一種變體重新連接的情況:

CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);

以上是使用 Spring Integration 適配器處理底層 MQTT 和 STOMP 連接的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除