搜尋
首頁科技週邊人工智慧阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?

大家好,我是君哥。

最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批訊息,例如100 條,第100 條訊息消費成功了,但是第50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊聊一下,如果一批訊息有消費失敗的狀況時,偏移量怎麼保存。

1 拉取訊息

1.1 封裝拉取請求

#以RocketMQ 推模式為例,RocketMQ 消費者啟動程式碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}

上面的DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是start。消費者啟動後會觸發重平衡線程(RebalanceService),這個線程的任務是在死循環中不停地進行重平衡,最終封裝拉取訊息的請求到 pullRequestQueue。這個過程涉及的UML 類別圖如下:

阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?

1.2 處理拉取請求

封裝好拉取訊息的請求PullRequest 後,RocketMQ 就會不停地從pullRequestQueue 取得訊息拉取請求進行處理。 UML 類別圖如下:

阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?

拉取訊息的入口方法是一個死循環,程式碼如下:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}

這裡拉取到訊息後,提交給PullCallback 這個回呼函數進行處理。

拉取到的訊息先被 put 到 ProcessQueue 中的 msgTreeMap 上,然後被封裝到 ConsumeRequest 這個執行緒類別來處理。把程式碼精簡後,ConsumeRequest 處理邏輯如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}

2 處理消費結果

2.1 並發訊息

並發訊息處理消費結果的程式碼做精簡後如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
){
 int ackIndex = context.getAckIndex();
 switch (status) {
case CONSUME_SUCCESS:
 if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
 }
 int ok = ackIndex + 1;
 int failed = consumeRequest.getMsgs().size() - ok;
 break;
case RECONSUME_LATER:
 break;
default:
 break;
 }

 switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 }
 break;
case CLUSTERING:
 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 msgBackFailed.add(msg);
}
 }

 if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
 }
 break;
default:
 break;
 }

 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}

從上面的程式碼可以看出,如果處理訊息的邏輯是串列的,例如文章開頭的程式碼使用for 迴圈來處理訊息,那如果在某一訊息處理失敗了,直接退出循環,給ConsumeConcurrentlyContext 的ackIndex 變數賦值為訊息清單中失敗訊息的位置,這樣這條失敗訊息後面的訊息就不再處理了,發送給Broker 等待重新拉取。程式碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 for (int i = 0; i < msgs.size(); i++) {
try{
 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
 context.setAckIndex(i);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}

消費成功的訊息則從 ProcessQueue 中的 msgTreeMap 中移除,並且傳回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:叢集模式偏移量保存在 Broker 端,更新偏移量需要傳送訊息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果處理訊息的邏輯是並行的,處理訊息失敗後給 ackIndex 賦值是沒有意義的,因為可能有多條訊息失敗,給 ackIndex 變數賦值並不準確。最好的方法就是給 ackIndex 賦值 0,整批訊息全部重新消費,這樣又可能帶來冥等問題。

2.2 順序訊息

對於順序訊息,從msgTreeMap 取出訊息後,先要放到consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從consumingMsgOrderlyTreeMap 上取最大的訊息偏移量( lastKey)。

3 總結

回到開頭的問題,如果一批訊息按照順序消費,是不可能出現第100 條訊息消費成功了,但第50 條消費失敗的情況,因為第50 條訊息失敗的時候,應該退出循環,不再繼續進行消費。

如果是並發消費,如果出現了這種情況,建議是整批訊息全部重新消費,也就是給 ackIndex 賦值 0,這樣就必須考慮冥等問題。

以上是阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:51CTO.COM。如有侵權,請聯絡admin@php.cn刪除
使用Pydantic構建結構化研究自動化系統使用Pydantic構建結構化研究自動化系統Apr 24, 2025 am 10:32 AM

在學術研究的動態領域,有效的信息收集,綜合和演示至關重要。 文獻綜述的手動過程是耗時的,阻礙了更深入的分析。 多代理研究助理系統BUI

10 GPT-4O圖像生成會提示今天嘗試!10 GPT-4O圖像生成會提示今天嘗試!Apr 24, 2025 am 10:26 AM

AI世界中發生了絕對野生的事情。 Openai的本地形像生成現在很瘋狂。我們正在談論令人jaw目結舌的視覺效果,可怕的細節和拋光的輸出

用帆板編碼的氛圍指南用帆板編碼的氛圍指南Apr 24, 2025 am 10:25 AM

毫不費力地將您的編碼願景帶入Codeium's Windsurf,這是您的AI驅動的編碼伴侶。 Windsurf簡化了整個軟件開發生命週期,從編碼和調試到優化,將過程轉換為INTU

使用RMGB v2.0探索圖像背景刪除使用RMGB v2.0探索圖像背景刪除Apr 24, 2025 am 10:20 AM

Braiai的RMGB v2.0:強大的開源背景拆卸模型 圖像分割模型正在徹底改變各個領域,而背景刪除是進步的關鍵領域。 Braiai的RMGB v2.0是最先進的開源M

評估大語模型中的毒性評估大語模型中的毒性Apr 24, 2025 am 10:14 AM

本文探討了大語言模型(LLM)中的毒性至關重要問題以及用於評估和減輕它的方法。 LLM,為從聊天機器人到內容生成的各種應用程序提供動力,需要強大的評估指標,機智

Rag Reranker的綜合指南Rag Reranker的綜合指南Apr 24, 2025 am 10:10 AM

檢索增強發電(RAG)系統正在轉換信息訪問,但其有效性取決於檢索到的數據的質量。 這是重讀者變得至關重要的地方 - 充當搜索結果的質量過濾器,以確保僅確保

如何使用Gemma 3&Docling構建多模式抹布?如何使用Gemma 3&Docling構建多模式抹布?Apr 24, 2025 am 10:04 AM

該教程通過在Google Colab中構建精緻的多式聯運檢索一代(RAG)管道來指導您。 我們將使用Gemma 3(用於語言和視覺),文檔(文檔轉換),Langchain等尖端工具

可擴展AI和機器學習應用的射線指南可擴展AI和機器學習應用的射線指南Apr 24, 2025 am 10:01 AM

雷:擴展AI和Python應用程序的有力框架 Ray是一個革命性的開源框架,旨在輕鬆擴展AI和Python應用程序。 它的直觀API使研究人員和開發人員可以通過其代碼過渡

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

SublimeText3 英文版

SublimeText3 英文版

推薦:為Win版本,支援程式碼提示!

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

WebStorm Mac版

WebStorm Mac版

好用的JavaScript開發工具

mPDF

mPDF

mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),