search
HomeJavaHandle underlying MQTT and STOMP connections using the Spring Integration adapter

Question content

We have some spring integration processes to handle messages arriving via mqtt or stomp. For this we use adapters mqttpahomessagedrivenchanneladapter and stompinboundchanneladapter. In the case of mqtt, we observed that if any endpoint in the stream throws an exception, the adapter closes the connection and no longer receives messages. Likewise, if we restart the proxy, the connection to it will not be established again.

In order to handle exceptions, we set the error channel name to the value handled by spring by default "errorchannel" adapter. Our intention is to only log the exception without closing the underlying connection. Is this the correct way to handle exceptions throughout the process?

Regarding the reconnection issue, we have different methods for each transport protocol.

  • For mqtt, we set automaticreconnect of connectionoptions to true:
var clientfactory = new defaultmqttpahoclientfactory();
clientfactory.getconnectionoptions().setautomaticreconnect(true);

var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic");
adapter.seterrorchannelname("errorchannel");
  • For stomp, we set the taskscheduler in the context to reactornettytcpstompclient:
var stompClient = new ReactorNettyTcpStompClient(host, port);
stompClient.setTaskScheduler(taskScheduler);

var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);

var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue");
adapter.setErrorChannelName("errorChannel");

Is this the best way to handle this problem?


Correct answer


Yes, the errorchannel option is a good way to suppress exceptions being thrown to mqtt clients. Doesn't have to be global errorchannel, it may be used in many different places. setautomaticreconnect(true) Really recommended for inbound channel adapters.

reactornettytcpstompclient's taskscheduler does not work with reconnections. See its javadocs. I think the reconnection logic is not used in reactornettytcpstompclient:

public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) {
    connectionhandlingstompsession session = createsession(connectheaders, handler);
    this.tcpclient.connectasync(session);
    return session.getsession();
}

Case of reconnection via another variant:

CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);

The above is the detailed content of Handle underlying MQTT and STOMP connections using the Spring Integration adapter. For more information, please follow other related articles on the PHP Chinese website!

Statement
This article is reproduced at:stackoverflow. If there is any infringement, please contact admin@php.cn delete

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

WebStorm Mac version

WebStorm Mac version

Useful JavaScript development tools