Kami mempunyai beberapa proses penyepaduan musim bunga untuk mengendalikan mesej yang tiba melalui mqtt atau stomp. Untuk ini kami menggunakan penyesuai mqttpahomessagedrivenchanneladapter
和 stompinboundchanneladapter
.
Dalam kes mqtt, kami mendapati bahawa jika mana-mana titik akhir dalam strim melemparkan pengecualian, penyesuai menutup sambungan dan tidak lagi menerima mesej. Begitu juga, jika kita memulakan semula proksi, sambungan kepadanya tidak akan diwujudkan lagi.
Untuk mengendalikan pengecualian, kami menetapkan nama saluran ralat kepada nilai yang dikendalikan oleh spring secara lalai penyesuai "errorchannel". Hasrat kami adalah untuk hanya log pengecualian tanpa menutup sambungan asas. Adakah ini cara yang betul untuk mengendalikan pengecualian sepanjang proses?
Mengenai isu penyambungan semula, kami mempunyai kaedah yang berbeza untuk setiap protokol pengangkutan.
connectionoptions
的 automaticreconnect
设置为 true
: var clientfactory = new defaultmqttpahoclientfactory(); clientfactory.getconnectionoptions().setautomaticreconnect(true); var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic"); adapter.seterrorchannelname("errorchannel");
taskscheduler
设置为 reactornettytcpstompclient
dalam konteks: var stompClient = new ReactorNettyTcpStompClient(host, port); stompClient.setTaskScheduler(taskScheduler); var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient); var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue"); adapter.setErrorChannelName("errorChannel");
Adakah ini cara terbaik untuk menangani masalah ini?
Ya, errorchannel
选项是抑制向 mqtt 客户端抛出异常的好方法。不必是全局 errorchannel
,它可能在许多不同的地方使用。 setautomaticreconnect(true)
sememangnya disyorkan untuk penyesuai saluran masuk.
Tidak digunakan dalam reactornettytcpstompclient
的 taskscheduler
不适用于重新连接。请参阅其 javadocs。我认为重新连接逻辑在 reactornettytcpstompclient
:
public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) { connectionhandlingstompsession session = createsession(connectheaders, handler); this.tcpclient.connectasync(session); return session.getsession(); }
Kes penyambungan semula melalui varian lain:
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
Atas ialah kandungan terperinci Kendalikan sambungan MQTT dan STOMP asas menggunakan penyesuai Integrasi Spring. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!