We have some spring integration processes to handle messages arriving via mqtt or stomp. For this we use adapters mqttpahomessagedrivenchanneladapter
and stompinboundchanneladapter
.
In the case of mqtt, we observed that if any endpoint in the stream throws an exception, the adapter closes the connection and no longer receives messages. Likewise, if we restart the proxy, the connection to it will not be established again.
In order to handle exceptions, we set the error channel name to the value handled by spring by default "errorchannel" adapter. Our intention is to only log the exception without closing the underlying connection. Is this the correct way to handle exceptions throughout the process?
Regarding the reconnection issue, we have different methods for each transport protocol.
automaticreconnect
of connectionoptions
to true
: var clientfactory = new defaultmqttpahoclientfactory(); clientfactory.getconnectionoptions().setautomaticreconnect(true); var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic"); adapter.seterrorchannelname("errorchannel");
taskscheduler
in the context to reactornettytcpstompclient
:var stompClient = new ReactorNettyTcpStompClient(host, port); stompClient.setTaskScheduler(taskScheduler); var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient); var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue"); adapter.setErrorChannelName("errorChannel");
Is this the best way to handle this problem?
Yes, the errorchannel
option is a good way to suppress exceptions being thrown to mqtt clients. Doesn't have to be global errorchannel
, it may be used in many different places. setautomaticreconnect(true)
Really recommended for inbound channel adapters.
reactornettytcpstompclient
's taskscheduler
does not work with reconnections. See its javadocs. I think the reconnection logic is not used in reactornettytcpstompclient
:
public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) { connectionhandlingstompsession session = createsession(connectheaders, handler); this.tcpclient.connectasync(session); return session.getsession(); }
Case of reconnection via another variant:
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
The above is the detailed content of Handle underlying MQTT and STOMP connections using the Spring Integration adapter. For more information, please follow other related articles on the PHP Chinese website!