Rumah >Peranti teknologi >AI >Halaman kedua Alibaba: Pengguna RocketMQ menarik sekumpulan mesej, tetapi sesetengah daripada mereka gagal menggunakan Bagaimana untuk mengemas kini offset?
Hello semua, saya Abang Jun.
Baru-baru ini, seorang pembaca ditanya soalan semasa temu bual Jika pengguna menarik sekumpulan mesej, seperti 100, dan penggunaan mesej ke-100 berjaya, tetapi penggunaan mesej ke-50 gagal, offset Bagaimana ia akan dikemas kini? Mengenai isu ini, mari kita bincangkan hari ini tentang cara menyimpan offset jika sekumpulan mesej gagal digunakan.
Mengambil mod tolak RocketMQ sebagai contoh, kod permulaan pengguna RocketMQ adalah seperti berikut:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
DefaultMQPushConsumer di atas ialah pengguna mod tolak, dan kaedah permulaan adalah bermula. Selepas pengguna dimulakan, utas pengimbangan semula (RebalanceService) akan dicetuskan Tugas utas ini adalah untuk mengimbangi semula secara berterusan dalam gelung tak terhingga, dan akhirnya merangkum permintaan untuk menarik mesej ke dalam pullRequestQueue. Gambar rajah kelas UML yang terlibat dalam proses ini adalah seperti berikut:
Selepas merangkum permintaan tarik PullRequest, RocketMQ tidak akan menerima mesej secara berterusan tarik permintaan daripada pullRequestQueue untuk pemprosesan. Rajah kelas UML adalah seperti berikut:
Kaedah kemasukan untuk menarik mesej ialah gelung tak terhingga, kodnya adalah seperti berikut:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
Selepas mesej ditarik ke sini, Serahkannya ke fungsi panggil balik PullCallback untuk diproses.
Mesej yang ditarik mula-mula dimasukkan ke dalam msgTreeMap dalam ProcessQueue, dan kemudian dirangkumkan ke dalam kelas benang ConsumeRequest untuk pemprosesan. Selepas memperkemas kod, logik pemprosesan ConsumeRequest adalah seperti berikut:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
Kod untuk pemprosesan mesej serentak hasil penggunaan dipermudahkan seperti berikut:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
Seperti yang dapat dilihat daripada kod di atas, jika logik pemprosesan mesej adalah bersiri, contohnya, kod pada permulaan artikel menggunakan gelung for untuk memproses mesej , maka jika pemprosesan mesej tertentu gagal, terus Keluar dari gelung dan tetapkan pembolehubah ackIndex ConsumeConcurrentlyContext kepada kedudukan mesej yang gagal dalam senarai mesej dengan cara ini, mesej yang mengikuti mesej gagal ini tidak lagi akan diproses dan akan dihantar kepada Broker untuk menunggu penarikan semula. Kodnya adalah seperti berikut:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Mesej yang berjaya digunakan dialih keluar daripada msgTreeMap dalam ProcessQueue dan offset terkecil (firstKey) dalam msgTreeMap dikembalikan untuk kemas kini. Nota: Offset mod kluster disimpan di bahagian Broker Untuk mengemas kini offset, mesej perlu dihantar kepada Broker Walau bagaimanapun, offset mod siaran disimpan di bahagian Consumer dan hanya offset tempatan perlu dikemas kini.
Jika logik pemprosesan mesej adalah selari, adalah tidak bermakna untuk memberikan nilai kepada ackIndex selepas pemprosesan mesej gagal, kerana berbilang mesej mungkin gagal dan memberikan nilai kepada pembolehubah ackIndex adalah tidak tepat. Cara terbaik ialah menetapkan nilai 0 kepada ackIndex dan menggunakan keseluruhan kumpulan mesej sekali lagi, yang mungkin menyebabkan masalah lain.
Untuk mesej berjujukan, selepas mengambil mesej daripada msgTreeMap, ia mesti diletakkan pada consumingMsgOrderlyTreeMap Apabila mengemas kini offset, offset terbesar mesej diambil daripada consumingMsgOrderlyTreeMap. .
Berbalik kepada soalan awal, jika sekumpulan mesej digunakan mengikut urutan, adalah mustahil untuk mesej ke-100 berjaya digunakan, tetapi mesej ke-50 gagal, kerana Apabila mesej ke-50 gagal, gelung harus keluar dan penggunaan tidak boleh diteruskan.
Jika ia adalah penggunaan serentak, jika keadaan ini berlaku, adalah disyorkan agar keseluruhan kumpulan mesej digunakan semula, iaitu, berikan nilai 0 kepada ackIndex, supaya isu seperti underworld mesti dipertimbangkan .
Atas ialah kandungan terperinci Halaman kedua Alibaba: Pengguna RocketMQ menarik sekumpulan mesej, tetapi sesetengah daripada mereka gagal menggunakan Bagaimana untuk mengemas kini offset?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!