Nous avons des processus d'intégration Spring pour gérer les messages arrivant via mqtt ou stomp. Pour cela nous utilisons un adaptateur mqttpahomessagedrivenchanneladapter
和 stompinboundchanneladapter
.
Dans le cas de mqtt, nous avons observé que si un point de terminaison du flux lève une exception, l'adaptateur ferme la connexion et ne reçoit plus de messages. De même, si nous redémarrons le proxy, la connexion à celui-ci ne sera plus établie.
Afin de gérer les exceptions, nous définissons le nom du canal d'erreur sur la valeur gérée par Spring par défaut "errorchannel" adaptateur. Notre intention est de consigner uniquement l'exception sans fermer la connexion sous-jacente. Est-ce la bonne façon de gérer les exceptions tout au long du processus ?
Concernant la problématique de reconnexion, nous disposons de méthodes différentes pour chaque protocole de transport.
connectionoptions
的 automaticreconnect
设置为 true
: var clientfactory = new defaultmqttpahoclientfactory(); clientfactory.getconnectionoptions().setautomaticreconnect(true); var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic"); adapter.seterrorchannelname("errorchannel");
taskscheduler
设置为 reactornettytcpstompclient
en contexte : var stompClient = new ReactorNettyTcpStompClient(host, port); stompClient.setTaskScheduler(taskScheduler); var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient); var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue"); adapter.setErrorChannelName("errorChannel");
Est-ce la meilleure façon de gérer ce problème ?
Oui, errorchannel
选项是抑制向 mqtt 客户端抛出异常的好方法。不必是全局 errorchannel
,它可能在许多不同的地方使用。 setautomaticreconnect(true)
est en effet recommandé pour les adaptateurs de canaux entrants.
Non utilisé dans reactornettytcpstompclient
的 taskscheduler
不适用于重新连接。请参阅其 javadocs。我认为重新连接逻辑在 reactornettytcpstompclient
:
public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) { connectionhandlingstompsession session = createsession(connectheaders, handler); this.tcpclient.connectasync(session); return session.getsession(); }
Cas de reconnexion via une autre variante :
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!