首頁 >科技週邊 >人工智慧 >阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?

阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?

WBOY
WBOY轉載
2023-04-12 23:28:141036瀏覽

大家好,我是君哥。

最近有讀者參加面試時被問了一個問題,如果消費者拉取了一批訊息,例如100 條,第100 條訊息消費成功了,但是第50 條消費失敗,偏移量會怎樣更新?就著這個問題,今天來聊聊一下,如果一批訊息有消費失敗的狀況時,偏移量怎麼保存。

1 拉取訊息

1.1 封裝拉取請求

#以RocketMQ 推模式為例,RocketMQ 消費者啟動程式碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}

上面的DefaultMQPushConsumer 是一個推模式的消費者,啟動方法是start。消費者啟動後會觸發重平衡線程(RebalanceService),這個線程的任務是在死循環中不停地進行重平衡,最終封裝拉取訊息的請求到 pullRequestQueue。這個過程涉及的UML 類別圖如下:

阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?

1.2 處理拉取請求

封裝好拉取訊息的請求PullRequest 後,RocketMQ 就會不停地從pullRequestQueue 取得訊息拉取請求進行處理。 UML 類別圖如下:

阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?

拉取訊息的入口方法是一個死循環,程式碼如下:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}

這裡拉取到訊息後,提交給PullCallback 這個回呼函數進行處理。

拉取到的訊息先被 put 到 ProcessQueue 中的 msgTreeMap 上,然後被封裝到 ConsumeRequest 這個執行緒類別來處理。把程式碼精簡後,ConsumeRequest 處理邏輯如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}

2 處理消費結果

2.1 並發訊息

並發訊息處理消費結果的程式碼做精簡後如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
){
 int ackIndex = context.getAckIndex();
 switch (status) {
case CONSUME_SUCCESS:
 if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
 }
 int ok = ackIndex + 1;
 int failed = consumeRequest.getMsgs().size() - ok;
 break;
case RECONSUME_LATER:
 break;
default:
 break;
 }

 switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 }
 break;
case CLUSTERING:
 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 msgBackFailed.add(msg);
}
 }

 if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
 }
 break;
default:
 break;
 }

 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}

從上面的程式碼可以看出,如果處理訊息的邏輯是串列的,例如文章開頭的程式碼使用for 迴圈來處理訊息,那如果在某一訊息處理失敗了,直接退出循環,給ConsumeConcurrentlyContext 的ackIndex 變數賦值為訊息清單中失敗訊息的位置,這樣這條失敗訊息後面的訊息就不再處理了,發送給Broker 等待重新拉取。程式碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 for (int i = 0; i < msgs.size(); i++) {
try{
 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
 context.setAckIndex(i);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}

消費成功的訊息則從 ProcessQueue 中的 msgTreeMap 中移除,並且傳回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:叢集模式偏移量保存在 Broker 端,更新偏移量需要傳送訊息到 Broker,而廣播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果處理訊息的邏輯是並行的,處理訊息失敗後給 ackIndex 賦值是沒有意義的,因為可能有多條訊息失敗,給 ackIndex 變數賦值並不準確。最好的方法就是給 ackIndex 賦值 0,整批訊息全部重新消費,這樣又可能帶來冥等問題。

2.2 順序訊息

對於順序訊息,從msgTreeMap 取出訊息後,先要放到consumingMsgOrderlyTreeMap 上面,更新偏移量時,是從consumingMsgOrderlyTreeMap 上取最大的訊息偏移量( lastKey)。

3 總結

回到開頭的問題,如果一批訊息按照順序消費,是不可能出現第100 條訊息消費成功了,但第50 條消費失敗的情況,因為第50 條訊息失敗的時候,應該退出循環,不再繼續進行消費。

如果是並發消費,如果出現了這種情況,建議是整批訊息全部重新消費,也就是給 ackIndex 賦值 0,這樣就必須考慮冥等問題。

以上是阿里二面:RocketMQ 消費者拉取一批訊息,其中部分消費失敗了,偏移量怎麼更新?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:51cto.com。如有侵權,請聯絡admin@php.cn刪除