我們有一些 spring integration 流程來處理透過 mqtt 或 stomp 到達的訊息。為此,我們使用適配器 mqttpahomessagedrivenchanneladapter
和 stompinboundchanneladapter
。
在 mqtt 的情況下,我們觀察到,如果流中的任何端點拋出異常,適配器將關閉連接並且不再接收訊息。同樣,如果我們重新啟動代理,則不會再次建立與其的連線。
為了處理異常問題,我們將錯誤通道名稱設定為spring預設處理的值「errorchannel」的適配器。我們的目的是只記錄異常,而不關閉底層連接。這是在整個流程中處理異常的正確方法嗎?
關於重新連接問題,我們對每種傳輸協定都有不同的方法。
connectionoptions
的 automaticreconnect
設定為 true
:var clientfactory = new defaultmqttpahoclientfactory(); clientfactory.getconnectionoptions().setautomaticreconnect(true); var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic"); adapter.seterrorchannelname("errorchannel");
taskscheduler
設定為 reactornettytcpstompclient
:var stompClient = new ReactorNettyTcpStompClient(host, port); stompClient.setTaskScheduler(taskScheduler); var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient); var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue"); adapter.setErrorChannelName("errorChannel");
這是處理這個問題的最佳方法嗎?
是的,errorchannel
選項是抑制向 mqtt 用戶端拋出例外的好方法。不必是全域 errorchannel
,它可能在許多不同的地方使用。 setautomaticreconnect(true)
確實建議用於入站通道適配器。
reactornettytcpstompclient
的 taskscheduler
不適用於重新連線。請參閱其 javadocs。我認為重新連接邏輯在 reactornettytcpstompclient
中沒有使用:
public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) { connectionhandlingstompsession session = createsession(connectheaders, handler); this.tcpclient.connectasync(session); return session.getsession(); }
透過另一種變體重新連接的情況:
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
以上是使用 Spring Integration 適配器處理底層 MQTT 和 STOMP 連接的詳細內容。更多資訊請關注PHP中文網其他相關文章!