


Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?
Hello everyone, I am Brother Jun.
Recently, a reader was asked a question when participating in an interview. If a consumer pulls a batch of messages, such as 100, the consumption of the 100th message is successful, but the consumption of the 50th message fails. The offset How will it be updated? Regarding this issue, let’s talk today about how to save the offset if a batch of messages fails to be consumed.
1 Pull message
1.1 Encapsulate pull request
Taking RocketMQ push mode as an example, the RocketMQ consumer startup code is as follows:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
The above DefaultMQPushConsumer is a push mode consumer, and the startup method is start. After the consumer is started, the rebalance thread (RebalanceService) will be triggered. The task of this thread is to continuously rebalance in an infinite loop, and finally encapsulate the request to pull the message into the pullRequestQueue. The UML class diagram involved in this process is as follows:
1.2 Processing pull requests
After encapsulating the pull request PullRequest, RocketMQ will not Continuously obtain message pull requests from pullRequestQueue for processing. The UML class diagram is as follows:
The entry method for pulling messages is an infinite loop, the code is as follows:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
After pulling the message here, submit it to PullCallback is handled by this callback function.
The pulled message is first put into msgTreeMap in ProcessQueue, and then encapsulated into the thread class ConsumeRequest for processing. After streamlining the code, the ConsumeRequest processing logic is as follows:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
2 Processing consumption results
2.1 Concurrent messages
The code for concurrent message processing consumption results is simplified as follows:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
As can be seen from the above code, if the logic of processing messages is serial, for example, the code at the beginning of the article uses a for loop to process messages, then if the processing of a certain message fails, exit the loop directly and give The ackIndex variable of ConsumeConcurrentlyContext is assigned the position of the failed message in the message list, so that the messages following this failed message will no longer be processed and will be sent to the Broker to wait for re-pulling. The code is as follows:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Successfully consumed messages are removed from msgTreeMap in ProcessQueue, and the smallest offset (firstKey) in msgTreeMap is returned for update. Note: The cluster mode offset is stored on the Broker side. To update the offset, a message needs to be sent to the Broker. However, the broadcast mode offset is stored on the Consumer side and only the local offset needs to be updated.
If the logic of processing messages is parallel, it is meaningless to assign a value to ackIndex after the message processing fails, because multiple messages may fail, and assigning a value to the ackIndex variable is not accurate. The best way is to assign a value of 0 to ackIndex and consume the entire batch of messages again, which may cause other problems.
2.2 Sequential messages
For sequential messages, after taking the message from msgTreeMap, it must first be placed on consumingMsgOrderlyTreeMap. When updating the offset, the largest message offset is taken from consumingMsgOrderlyTreeMap ( lastKey).
3 Summary
Going back to the initial question, if a batch of messages is consumed in order, it is impossible for the 100th message to be consumed successfully, but the 50th message to fail, because When the 50th message fails, the loop should be exited and consumption should not continue.
If it is concurrent consumption, if this situation occurs, it is recommended that the entire batch of messages be consumed again, that is, assign a value of 0 to ackIndex. In this way, issues such as underworld must be considered.
The above is the detailed content of Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?. For more information, please follow other related articles on the PHP Chinese website!

This article explores the growing concern of "AI agency decay"—the gradual decline in our ability to think and decide independently. This is especially crucial for business leaders navigating the increasingly automated world while retainin

Ever wondered how AI agents like Siri and Alexa work? These intelligent systems are becoming more important in our daily lives. This article introduces the ReAct pattern, a method that enhances AI agents by combining reasoning an

"I think AI tools are changing the learning opportunities for college students. We believe in developing students in core courses, but more and more people also want to get a perspective of computational and statistical thinking," said University of Chicago President Paul Alivisatos in an interview with Deloitte Nitin Mittal at the Davos Forum in January. He believes that people will have to become creators and co-creators of AI, which means that learning and other aspects need to adapt to some major changes. Digital intelligence and critical thinking Professor Alexa Joubin of George Washington University described artificial intelligence as a “heuristic tool” in the humanities and explores how it changes

LangChain is a powerful toolkit for building sophisticated AI applications. Its agent architecture is particularly noteworthy, allowing developers to create intelligent systems capable of independent reasoning, decision-making, and action. This expl

Radial Basis Function Neural Networks (RBFNNs): A Comprehensive Guide Radial Basis Function Neural Networks (RBFNNs) are a powerful type of neural network architecture that leverages radial basis functions for activation. Their unique structure make

Brain-computer interfaces (BCIs) directly link the brain to external devices, translating brain impulses into actions without physical movement. This technology utilizes implanted sensors to capture brain signals, converting them into digital comman

This "Leading with Data" episode features Ines Montani, co-founder and CEO of Explosion AI, and co-developer of spaCy and Prodigy. Ines offers expert insights into the evolution of these tools, Explosion's unique business model, and the tr

This article explores Retrieval Augmented Generation (RAG) systems and how AI agents can enhance their capabilities. Traditional RAG systems, while useful for leveraging custom enterprise data, suffer from limitations such as a lack of real-time dat


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

MantisBT
Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

Dreamweaver Mac version
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

PhpStorm Mac version
The latest (2018.2.1) professional PHP integrated development tool

WebStorm Mac version
Useful JavaScript development tools