안녕하세요 여러분 준형님 입니다.
한 독자가 최근 인터뷰 중에 질문을 받았습니다. 소비자가 100개와 같은 메시지 배치를 가져오고 100번째 메시지가 성공적으로 소비되었지만 50번째 메시지가 실패하는 경우 오프셋은 어떻게 업데이트됩니까? 이 문제와 관련하여 오늘은 일괄 메시지 소비에 실패할 경우 오프셋을 저장하는 방법에 대해 이야기해 보겠습니다.
RocketMQ 푸시 모드를 예로 들면 RocketMQ 소비자 시작 코드는 다음과 같습니다.
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
위의 DefaultMQPushConsumer는 푸시 모드 소비자이며 시작 방법은 start입니다. 소비자가 시작된 후 재조정 스레드(RebalanceService)가 트리거됩니다. 이 스레드의 작업은 무한 루프에서 지속적으로 재조정하고 마지막으로 메시지를 pullRequestQueue로 가져오는 요청을 캡슐화하는 것입니다. 이 프로세스에 포함된 UML 클래스 다이어그램은 다음과 같습니다.
풀 요청 PullRequest를 캡슐화한 후 RocketMQ는 처리를 위해 pullRequestQueue에서 메시지 풀 요청을 지속적으로 가져옵니다. UML 클래스 다이어그램은 다음과 같습니다.
메시지를 가져오는 입력 방법은 무한 루프이며 코드는 다음과 같습니다.
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
여기에서 메시지를 가져온 후 PullCallback 콜백 함수에 제출됩니다. 처리.
가져온 메시지는 먼저 ProcessQueue의 msgTreeMap에 넣은 다음 처리를 위해 스레드 클래스 ConsumeRequest에 캡슐화됩니다. 코드를 간소화한 후 ConsumeRequest 처리 로직은 다음과 같습니다.
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
동시 메시지로 소비 결과를 처리하는 코드는 다음과 같이 단순화됩니다.
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
에서 볼 수 있듯이 위 코드, 메시지가 처리되는 경우 논리는 직렬입니다. 예를 들어 기사 시작 부분의 코드는 for 루프를 사용하여 메시지를 처리하는 데 실패하면 루프를 직접 종료하고 ackIndex 변수를 할당합니다. ConsumeConcurrentlyContext를 메시지 목록에서 실패한 메시지 위치로 설정하면 첫 번째 실패한 메시지 다음의 메시지는 더 이상 처리되지 않고 다시 가져오기를 기다리기 위해 브로커로 전송됩니다. 코드는 다음과 같습니다.
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
성공적으로 소비된 메시지는 ProcessQueue의 msgTreeMap에서 제거되고 msgTreeMap의 가장 작은 오프셋(firstKey)이 업데이트를 위해 반환됩니다. 참고: 클러스터 모드 오프셋은 브로커 측에 저장됩니다. 오프셋을 업데이트하려면 메시지를 브로커에 전송해야 합니다. 그러나 브로드캐스트 모드 오프셋은 소비자 측에 저장되며 로컬 오프셋만 업데이트하면 됩니다.
메시지 처리 논리가 병렬인 경우 여러 메시지가 실패할 수 있고 ackIndex 변수에 값을 할당하는 것이 정확하지 않기 때문에 메시지 처리가 실패한 후 ackIndex에 값을 할당하는 것은 의미가 없습니다. 가장 좋은 방법은 ackIndex에 0 값을 할당하고 전체 메시지 배치를 다시 사용하는 것입니다. 그러면 다른 문제가 발생할 수 있습니다.
순차 메시지의 경우 msgTreeMap에서 메시지를 가져온 후 먼저 소비MsgOrderlyTreeMap에 배치해야 합니다. 오프셋을 업데이트할 때 최대 메시지 오프셋(lastKey)은 소비MsgOrderlyTreeMap에서 가져옵니다.
원래 질문으로 돌아가서, 메시지 배치를 순서대로 소비하면 100번째 메시지는 성공적으로 소비되는 것이 불가능하지만 50번째 메시지는 실패합니다. 종료되어야 하며 소비가 더 이상 지속되어서는 안 됩니다.
동시 소비라면 이런 상황이 발생하면 전체 메시지 배치를 다시 소비하는 것이 좋습니다. 즉, ackIndex에 0 값을 할당하므로 언더월드와 같은 문제를 고려해야 합니다.
위 내용은 Alibaba의 두 번째 페이지: RocketMQ 소비자가 일괄 메시지를 가져오지만 일부는 오프셋을 업데이트하는 데 실패합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!