Home > Article > Technology peripherals > Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?
Hello everyone, I am Brother Jun.
Recently, a reader was asked a question when participating in an interview. If a consumer pulls a batch of messages, such as 100, the consumption of the 100th message is successful, but the consumption of the 50th message fails. The offset How will it be updated? Regarding this issue, let’s talk today about how to save the offset if a batch of messages fails to be consumed.
Taking RocketMQ push mode as an example, the RocketMQ consumer startup code is as follows:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
The above DefaultMQPushConsumer is a push mode consumer, and the startup method is start. After the consumer is started, the rebalance thread (RebalanceService) will be triggered. The task of this thread is to continuously rebalance in an infinite loop, and finally encapsulate the request to pull the message into the pullRequestQueue. The UML class diagram involved in this process is as follows:
After encapsulating the pull request PullRequest, RocketMQ will not Continuously obtain message pull requests from pullRequestQueue for processing. The UML class diagram is as follows:
The entry method for pulling messages is an infinite loop, the code is as follows:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
After pulling the message here, submit it to PullCallback is handled by this callback function.
The pulled message is first put into msgTreeMap in ProcessQueue, and then encapsulated into the thread class ConsumeRequest for processing. After streamlining the code, the ConsumeRequest processing logic is as follows:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
The code for concurrent message processing consumption results is simplified as follows:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
As can be seen from the above code, if the logic of processing messages is serial, for example, the code at the beginning of the article uses a for loop to process messages, then if the processing of a certain message fails, exit the loop directly and give The ackIndex variable of ConsumeConcurrentlyContext is assigned the position of the failed message in the message list, so that the messages following this failed message will no longer be processed and will be sent to the Broker to wait for re-pulling. The code is as follows:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Successfully consumed messages are removed from msgTreeMap in ProcessQueue, and the smallest offset (firstKey) in msgTreeMap is returned for update. Note: The cluster mode offset is stored on the Broker side. To update the offset, a message needs to be sent to the Broker. However, the broadcast mode offset is stored on the Consumer side and only the local offset needs to be updated.
If the logic of processing messages is parallel, it is meaningless to assign a value to ackIndex after the message processing fails, because multiple messages may fail, and assigning a value to the ackIndex variable is not accurate. The best way is to assign a value of 0 to ackIndex and consume the entire batch of messages again, which may cause other problems.
For sequential messages, after taking the message from msgTreeMap, it must first be placed on consumingMsgOrderlyTreeMap. When updating the offset, the largest message offset is taken from consumingMsgOrderlyTreeMap ( lastKey).
Going back to the initial question, if a batch of messages is consumed in order, it is impossible for the 100th message to be consumed successfully, but the 50th message to fail, because When the 50th message fails, the loop should be exited and consumption should not continue.
If it is concurrent consumption, if this situation occurs, it is recommended that the entire batch of messages be consumed again, that is, assign a value of 0 to ackIndex. In this way, issues such as underworld must be considered.
The above is the detailed content of Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?. For more information, please follow other related articles on the PHP Chinese website!