大家好,我是君哥。
最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批訊息,例如100 條,第100 條訊息消費成功了,但是第50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊聊一下,如果一批訊息有消費失敗的狀況時,偏移量怎麼保存。
1 拉取訊息
1.1 封裝拉取請求
#以RocketMQ 推模式為例,RocketMQ 消費者啟動程式碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
上面的DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是start。消費者啟動後會觸發重平衡線程(RebalanceService),這個線程的任務是在死循環中不停地進行重平衡,最終封裝拉取訊息的請求到 pullRequestQueue。這個過程涉及的UML 類別圖如下:
1.2 處理拉取請求
封裝好拉取訊息的請求PullRequest 後,RocketMQ 就會不停地從pullRequestQueue 取得訊息拉取請求進行處理。 UML 類別圖如下:
拉取訊息的入口方法是一個死循環,程式碼如下:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
這裡拉取到訊息後,提交給PullCallback 這個回呼函數進行處理。
拉取到的訊息先被 put 到 ProcessQueue 中的 msgTreeMap 上,然後被封裝到 ConsumeRequest 這個執行緒類別來處理。把程式碼精簡後,ConsumeRequest 處理邏輯如下:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
2 處理消費結果
2.1 並發訊息
並發訊息處理消費結果的程式碼做精簡後如下:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
從上面的程式碼可以看出,如果處理訊息的邏輯是串列的,例如文章開頭的程式碼使用for 迴圈來處理訊息,那如果在某一訊息處理失敗了,直接退出循環,給ConsumeConcurrentlyContext 的ackIndex 變數賦值為訊息清單中失敗訊息的位置,這樣這條失敗訊息後面的訊息就不再處理了,發送給Broker 等待重新拉取。程式碼如下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
消費成功的訊息則從 ProcessQueue 中的 msgTreeMap 中移除,並且傳回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:叢集模式偏移量保存在 Broker 端,更新偏移量需要傳送訊息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。
如果處理訊息的邏輯是並行的,處理訊息失敗後給 ackIndex 賦值是沒有意義的,因為可能有多條訊息失敗,給 ackIndex 變數賦值並不準確。最好的方法就是給 ackIndex 賦值 0,整批訊息全部重新消費,這樣又可能帶來冥等問題。
2.2 順序訊息
對於順序訊息,從msgTreeMap 取出訊息後,先要放到consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從consumingMsgOrderlyTreeMap 上取最大的訊息偏移量( lastKey)。
3 總結
回到開頭的問題,如果一批訊息按照順序消費,是不可能出現第100 條訊息消費成功了,但第50 條消費失敗的情況,因為第50 條訊息失敗的時候,應該退出循環,不再繼續進行消費。
如果是並發消費,如果出現了這種情況,建議是整批訊息全部重新消費,也就是給 ackIndex 賦值 0,這樣就必須考慮冥等問題。
以上是阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

在學術研究的動態領域,有效的信息收集,綜合和演示至關重要。 文獻綜述的手動過程是耗時的,阻礙了更深入的分析。 多代理研究助理系統BUI

AI世界中發生了絕對野生的事情。 Openai的本地形像生成現在很瘋狂。我們正在談論令人jaw目結舌的視覺效果,可怕的細節和拋光的輸出

毫不費力地將您的編碼願景帶入Codeium's Windsurf,這是您的AI驅動的編碼伴侶。 Windsurf簡化了整個軟件開發生命週期,從編碼和調試到優化,將過程轉換為INTU

Braiai的RMGB v2.0:強大的開源背景拆卸模型 圖像分割模型正在徹底改變各個領域,而背景刪除是進步的關鍵領域。 Braiai的RMGB v2.0是最先進的開源M

本文探討了大語言模型(LLM)中的毒性至關重要問題以及用於評估和減輕它的方法。 LLM,為從聊天機器人到內容生成的各種應用程序提供動力,需要強大的評估指標,機智

檢索增強發電(RAG)系統正在轉換信息訪問,但其有效性取決於檢索到的數據的質量。 這是重讀者變得至關重要的地方 - 充當搜索結果的質量過濾器,以確保僅確保

該教程通過在Google Colab中構建精緻的多式聯運檢索一代(RAG)管道來指導您。 我們將使用Gemma 3(用於語言和視覺),文檔(文檔轉換),Langchain等尖端工具

雷:擴展AI和Python應用程序的有力框架 Ray是一個革命性的開源框架,旨在輕鬆擴展AI和Python應用程序。 它的直觀API使研究人員和開發人員可以通過其代碼過渡


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

SublimeText3漢化版
中文版,非常好用

SublimeText3 英文版
推薦:為Win版本,支援程式碼提示!

SublimeText3 Linux新版
SublimeText3 Linux最新版

WebStorm Mac版
好用的JavaScript開發工具

mPDF
mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),