Alibaba의 두 번째 페이지: RocketMQ 소비자가 일괄 메시지를 가져오지만 일부는 오프셋을 업데이트하는 데 실패합니다.
안녕하세요 여러분 준형님 입니다.
한 독자가 최근 인터뷰 중에 질문을 받았습니다. 소비자가 100개와 같은 메시지 배치를 가져오고 100번째 메시지가 성공적으로 소비되었지만 50번째 메시지가 실패하는 경우 오프셋은 어떻게 업데이트됩니까? 이 문제와 관련하여 오늘은 일괄 메시지 소비에 실패할 경우 오프셋을 저장하는 방법에 대해 이야기해 보겠습니다.
1 메시지 가져오기
1.1 풀 요청 캡슐화
RocketMQ 푸시 모드를 예로 들면 RocketMQ 소비자 시작 코드는 다음과 같습니다.
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
위의 DefaultMQPushConsumer는 푸시 모드 소비자이며 시작 방법은 start입니다. 소비자가 시작된 후 재조정 스레드(RebalanceService)가 트리거됩니다. 이 스레드의 작업은 무한 루프에서 지속적으로 재조정하고 마지막으로 메시지를 pullRequestQueue로 가져오는 요청을 캡슐화하는 것입니다. 이 프로세스에 포함된 UML 클래스 다이어그램은 다음과 같습니다.
1.2 풀 요청 처리
풀 요청 PullRequest를 캡슐화한 후 RocketMQ는 처리를 위해 pullRequestQueue에서 메시지 풀 요청을 지속적으로 가져옵니다. UML 클래스 다이어그램은 다음과 같습니다.
메시지를 가져오는 입력 방법은 무한 루프이며 코드는 다음과 같습니다.
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
여기에서 메시지를 가져온 후 PullCallback 콜백 함수에 제출됩니다. 처리.
가져온 메시지는 먼저 ProcessQueue의 msgTreeMap에 넣은 다음 처리를 위해 스레드 클래스 ConsumeRequest에 캡슐화됩니다. 코드를 간소화한 후 ConsumeRequest 처리 로직은 다음과 같습니다.
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
2 소비 결과 처리
2.1 동시 메시지
동시 메시지로 소비 결과를 처리하는 코드는 다음과 같이 단순화됩니다.
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
에서 볼 수 있듯이 위 코드, 메시지가 처리되는 경우 논리는 직렬입니다. 예를 들어 기사 시작 부분의 코드는 for 루프를 사용하여 메시지를 처리하는 데 실패하면 루프를 직접 종료하고 ackIndex 변수를 할당합니다. ConsumeConcurrentlyContext를 메시지 목록에서 실패한 메시지 위치로 설정하면 첫 번째 실패한 메시지 다음의 메시지는 더 이상 처리되지 않고 다시 가져오기를 기다리기 위해 브로커로 전송됩니다. 코드는 다음과 같습니다.
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
성공적으로 소비된 메시지는 ProcessQueue의 msgTreeMap에서 제거되고 msgTreeMap의 가장 작은 오프셋(firstKey)이 업데이트를 위해 반환됩니다. 참고: 클러스터 모드 오프셋은 브로커 측에 저장됩니다. 오프셋을 업데이트하려면 메시지를 브로커에 전송해야 합니다. 그러나 브로드캐스트 모드 오프셋은 소비자 측에 저장되며 로컬 오프셋만 업데이트하면 됩니다.
메시지 처리 논리가 병렬인 경우 여러 메시지가 실패할 수 있고 ackIndex 변수에 값을 할당하는 것이 정확하지 않기 때문에 메시지 처리가 실패한 후 ackIndex에 값을 할당하는 것은 의미가 없습니다. 가장 좋은 방법은 ackIndex에 0 값을 할당하고 전체 메시지 배치를 다시 사용하는 것입니다. 그러면 다른 문제가 발생할 수 있습니다.
2.2 순차 메시지
순차 메시지의 경우 msgTreeMap에서 메시지를 가져온 후 먼저 소비MsgOrderlyTreeMap에 배치해야 합니다. 오프셋을 업데이트할 때 최대 메시지 오프셋(lastKey)은 소비MsgOrderlyTreeMap에서 가져옵니다.
3 요약
원래 질문으로 돌아가서, 메시지 배치를 순서대로 소비하면 100번째 메시지는 성공적으로 소비되는 것이 불가능하지만 50번째 메시지는 실패합니다. 종료되어야 하며 소비가 더 이상 지속되어서는 안 됩니다.
동시 소비라면 이런 상황이 발생하면 전체 메시지 배치를 다시 소비하는 것이 좋습니다. 즉, ackIndex에 0 값을 할당하므로 언더월드와 같은 문제를 고려해야 합니다.
위 내용은 Alibaba의 두 번째 페이지: RocketMQ 소비자가 일괄 메시지를 가져오지만 일부는 오프셋을 업데이트하는 데 실패합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

Hiddenlayer의 획기적인 연구는 LLMS (Leading Lange Language Models)에서 중요한 취약점을 드러냅니다. 그들의 연구 결과는 "정책 인형극"이라는 보편적 인 바이 패스 기술을 보여줍니다.

환경 책임과 폐기물 감소에 대한 추진은 기본적으로 비즈니스 운영 방식을 바꾸는 것입니다. 이 혁신은 제품 개발, 제조 프로세스, 고객 관계, 파트너 선택 및 새로운 채택에 영향을 미칩니다.

Advanced AI 하드웨어에 대한 최근 제한은 AI 지배에 대한 확대 된 지정 학적 경쟁을 강조하여 중국의 외국 반도체 기술에 대한 의존도를 드러냅니다. 2024 년에 중국은 3,800 억 달러 상당의 반도체를 수입했습니다.

Google의 Chrome의 잠재적 인 강제 매각은 기술 산업 내에서 강력한 논쟁을 불러 일으켰습니다. OpenAi가 65%의 글로벌 시장 점유율을 자랑하는 주요 브라우저를 인수 할 가능성은 TH의 미래에 대한 중요한 의문을 제기합니다.

전반적인 광고 성장을 능가 함에도 불구하고 소매 미디어의 성장은 느려지고 있습니다. 이 성숙 단계는 생태계 조각화, 비용 상승, 측정 문제 및 통합 복잡성을 포함한 과제를 제시합니다. 그러나 인공 지능

깜박 거리는 스크린 모음 속에서 정적으로 오래된 라디오가 딱딱합니다. 이 불안정한 전자 제품 더미, 쉽게 불안정하게, 몰입 형 전시회에서 6 개의 설치 중 하나 인 "The-Waste Land"의 핵심을 형성합니다.

Google Cloud의 다음 2025 : 인프라, 연결 및 AI에 대한 초점 Google Cloud의 다음 2025 회의는 수많은 발전을 선보였으며 여기에서 자세히 설명하기에는 너무 많았습니다. 특정 공지 사항에 대한 심도있는 분석은 My의 기사를 참조하십시오.

이번 주 AI 및 XR : AI 구동 창의성의 물결은 음악 세대에서 영화 제작에 이르기까지 미디어와 엔터테인먼트를 통해 휩쓸고 있습니다. 헤드 라인으로 뛰어 들자. AI 생성 콘텐츠의 영향력 증가 : 기술 컨설턴트 인 Shelly Palme


핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

Eclipse용 SAP NetWeaver 서버 어댑터
Eclipse를 SAP NetWeaver 애플리케이션 서버와 통합합니다.

DVWA
DVWA(Damn Vulnerable Web App)는 매우 취약한 PHP/MySQL 웹 애플리케이션입니다. 주요 목표는 보안 전문가가 법적 환경에서 자신의 기술과 도구를 테스트하고, 웹 개발자가 웹 응용 프로그램 보안 프로세스를 더 잘 이해할 수 있도록 돕고, 교사/학생이 교실 환경 웹 응용 프로그램에서 가르치고 배울 수 있도록 돕는 것입니다. 보안. DVWA의 목표는 다양한 난이도의 간단하고 간단한 인터페이스를 통해 가장 일반적인 웹 취약점 중 일부를 연습하는 것입니다. 이 소프트웨어는

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

VSCode Windows 64비트 다운로드
Microsoft에서 출시한 강력한 무료 IDE 편집기
