>  기사  >  Java  >  Java RabbitMQ 메시지 대기열의 일반적인 문제 및 해결 방법 분석

Java RabbitMQ 메시지 대기열의 일반적인 문제 및 해결 방법 분석

王林
王林앞으로
2023-04-23 09:49:062137검색

메시지 축적

메시지 축적의 생성 시나리오:

  • 생산자가 생성하는 메시지의 속도는 소비자의 소비 속도보다 빠릅니다. 해결책: 소비자의 수나 속도를 늘리십시오.

  • 소비할 소비자가 없을 때. 해결 방법: 배달 못한 편지 대기열, 메시지 유효 기간을 설정합니다. 이는 메시지의 유효 기간을 설정하는 것과 같습니다. 지정된 시간 내에 소비가 없으면 자동으로 만료됩니다. 만료되면 클라이언트 콜백 모니터링 방법이 실행되어 메시지를 데이터베이스 테이블 레코드에 저장합니다. 보상은 나중에 실현될 것입니다.

메시지가 손실되지 않도록 보장

1. 생산자는 메시지 확인 메커니즘을 사용하여 메시지가 MQ에 100% 성공적으로 전달될 수 있는지 확인합니다.

2. MQ 서버는 메시지를 하드 디스크에 유지해야 합니다.

3. 소비자는 수동 확인 메커니즘을 사용하여 메시지 소비가 성공했는지 확인합니다.

MQ 서버 용량이 가득 찬 경우 어떻게 해야 합니까?

배달 못한 편지 대기열을 사용하여 데이터베이스에 메시지를 저장하고 나중에 소비를 보상합니다.

데드 레터 큐

RabbitMQ 데드 레터 큐는 일반적으로 스페어 타이어 큐로 알려져 있습니다. 메시지 미들웨어가 어떤 이유로든 메시지를 거부하면 데드 레터 큐에 저장될 수도 있습니다. 스위치 및 라우팅 키 등.

배경:

  • MQ에 전달된 메시지가 만료되었습니다

  • 큐가 최대 길이에 도달했습니다(큐 컨테이너가 가득 찼습니다). 생산자가 메시지 수신을 거부했습니다.

  • 소비자가 여러 개를 소비하지 못했습니다. 메시지는 배달 못한 편지 대기열

코드 사례:

maven 종속성

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml 구성

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

queue 구성 클래스

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

잘못된 편지 대기열 소비자

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

일반 대기열 소비자

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

에 전송되어 저장됩니다. 백그라운드 큐 관리 페이지는 다음과 같습니다.

Java RabbitMQ 메시지 대기열의 일반적인 문제 및 해결 방법 분석

배포 방법: 배달 못한 편지 큐는 일반 큐와 동일한 서버에 존재할 수 없으며 별도의 서버에 저장되어야 합니다.

Delay Queue

주문이 30분 동안 결제되지 않으면 시스템이 자동으로 종료되고 종료되도록 구현 계획입니다.

작업 스케줄링을 기반으로 구현되어 효율성이 매우 낮습니다.

redis 만료된 키를 기반으로 구현되었습니다. 키가 만료되면 메서드가 클라이언트로 다시 호출됩니다.

사용자가 주문하면 30분 동안 토큰(유효 기간)이 생성되어 Redis에 저장됩니다. 단점: 매우 중복되어 중복 필드가 테이블에 저장됩니다.

MQ 기반 지연 대기열(최상의 솔루션) RabbitMQ 상황.

원리: 주문할 때 mq에 메시지를 전달하고 유효 기간을 30분으로 설정합니다. 그러나 메시지가 만료되면(소비되지 않고) 클라이언트에서 메시지가 완료되었음을 알리는 메서드를 실행합니다. 만료되었습니다. 이때 주문이 결제되었는지 확인하세요.

구현 논리:

주로 배달 못한 편지 대기열을 사용하여 구현됩니다.

Java RabbitMQ 메시지 대기열의 일반적인 문제 및 해결 방법 분석

원하는 코드: 일반 소비자가 메시지를 소비하지 않거나 일반 소비자가 없는 경우 설정된 시간 후에 데드 레터 대기열에 들어간 후 데드 레터 소비자가 해당 비즈니스 로직을 구현합니다.

RabbitMQ 메시지 멱등성 문제

RabbitMQ 메시지 자동 재시도 메커니즘

소비자 비즈니스 로직 코드에서 예외가 발생하면 재시도가 자동으로 구현됩니다(기본값은 무수한 재시도)

RabbitMQ 수에 제한이 있어야 합니다 재시도(예: 매번 3초 간격으로 최대 5회 재시도). 재시도가 여러 번 실패하면 나중에 수동 보상을 기록하기 위해 배달 못한 편지 대기열에 저장되거나 데이터베이스 테이블에 저장됩니다. 실패한 재시도 횟수 이후에는 대기열이 자동으로 메시지를 삭제하기 때문입니다.

메시지 재시도 원칙: 재시도 과정에서 aop를 사용하여 소비 모니터링 방법을 가로채면 이 오류 로그가 인쇄되지 않습니다. 여러 번 재시도한 후 실패하면 최대 실패 횟수에 도달한 경우에만 오류 로그가 인쇄됩니다.

사용이 여러 번 실패하는 경우:

1. 메시지를 자동으로 삭제합니다(메시지가 손실될 수 있음).

해결책:

보완이 여러 번 실패하면 결국 배달 못한 편지 대기열에 저장됩니다. 테이블 로깅을 통해 소비 실패 오류 로그를 기록하고 나중에 메시지를 수동으로 보정합니다.

재시도 메커니즘의 합리적인 선택

소비자가 메시지를 얻은 후 제3자 인터페이스(HTTP 요청)를 호출하지만, 제3자 인터페이스 호출이 실패하면 어떻게 될까요? 다시 시도해야 합니까?

답변: 때때로 네트워크 예외로 인해 호출이 실패하고 여러 번 다시 시도해야 할 수도 있습니다.

소비자가 메시지를 받은 후 코드 문제로 인해 데이터 예외가 발생합니다. 재시도해야 합니까?

답변: 재시도할 필요는 없습니다. 코드가 비정상이라면 코드 릴리스 프로젝트를 다시 수정해야 합니다.

소비자가 수동 ack 모드를 켭니다

첫 번째 단계인 springboot 프로젝트 구성에서 ack 모드를 켜야 합니다

acknowledge-mode: manual

두 번째 단계, 소비자 Java 코드
int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

어떻게 하나요? RabbitMQ는 메시지 전력 문제를 해결합니다.

메시지 멱등성이란 무엇인가요? MQ 소비자는 어떻게 멱등성을 보장합니까?

원인: 소비자가 자동 ​​재시도를 켤 수 있으며, 재시도 프로세스로 인해 소비자의 비즈니스 로직 코드가 반복적으로 실행될 수 있습니다. 현재 메시지가 소비되었습니다. 비즈니스 오류로 인해 메시지가 다시 소비되었으므로 이때 해결 방법이 나타납니다. 비즈니스 ID에 따라 메시지 글로벌 ID를 사용하십시오. id), 소비자는 메시지가 소비되었음을 판단할 수 있습니다.

소비자 코드 논리:

Java RabbitMQ 메시지 대기열의 일반적인 문제 및 해결 방법 분석RabbitMQ는 분산 트랜잭션 문제를 해결합니다.

분산 트랜잭션: 분산 시스템에서는 서비스 간 호출 인터페이스로 인해 여러 개의 서로 다른 트랜잭션이 있으며 각 트랜잭션은 서로 영향을 미치지 않습니다. 분산 트랜잭션에 문제가 있습니다.

분산 트랜잭션의 핵심 아이디어인 데이터의 최종 일관성을 해결하세요.

분산 분야의 명사:

강한 일관성: 동기화 속도가 매우 빠르거나 잠금 메커니즘이 더티 읽기를 허용하지 않습니다.

강력한 일관성 솔루션: 데이터베이스 A가 데이터를 데이터 B에 매우 빠르게 동기화하거나 데이터베이스 B입니다. 데이터베이스 A의 동기화가 완료되기 전에는 데이터를 읽을 수 없습니다.

약한 일관성: 읽기가 허용되는 데이터는 원본 더티 데이터이며, 읽은 결과는 불일치가 허용됩니다.

최종 일관성: 분산 시스템에서는 데이터가 네트워크를 통해 동기적으로 전달되기 때문에 짧은 데이터 지연은 허용되지만 최종 데이터는 일관성이 있어야 합니다.

RabbitMQ를 기반으로 분산 트랜잭션을 해결하는 아이디어

RabbitMQ를 기반으로 분산 트랜잭션을 해결하는 아이디어: (최종 일관성 솔루션 채택)

    Producer 메시지가 MQ로 전달되어야 함을 확인합니다(메시지 확인 메커니즘) 전달이 실패하면 계속해서 다시 시도하세요
  • 소비자는 수동 확인을 사용하여 메시지를 확인하여 소비를 실현합니다. 소비 실패의 경우 mq는 자동으로 소비자의 재시도를 돕습니다.
  • 생산자의 첫 번째 트랜잭션이 먼저 실행되는지 확인하세요. 실행이 실패하면 보충 주문 대기열이 사용됩니다(생산자의 첫 번째 트랜잭션이 실행되도록 보장하기 위해 생산자 자체 트랜잭션을 보완하기 위해 [최종 데이터 일관성]).
  • 솔루션 맵: 핵심은 mq를 사용하여 다른 시스템에 메시지를 보내 데이터를 다시 수정하는 것입니다.

위 내용은 Java RabbitMQ 메시지 대기열의 일반적인 문제 및 해결 방법 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제