>데이터 베이스 >Redis >Redis에서 메시지 대기열을 사용하는 방법에 대한 간략한 분석

Redis에서 메시지 대기열을 사용하는 방법에 대한 간략한 분석

青灯夜游
青灯夜游앞으로
2022-01-05 09:57:352731검색

이 글은 Redis - 메시지 큐의 고급 사용법을 안내하고 Redis의 지연 큐를 소개합니다. 도움이 되길 바랍니다!

Redis에서 메시지 대기열을 사용하는 방법에 대한 간략한 분석

메시지 큐 미들웨어에 대해 말하면 우리 모두는 애플리케이션에 대한 비동기 메시징 기능을 구현하기 위해 RabbitMQ, RocketMQ 및 Kafka를 생각합니다. 이는 우리가 이해할 수 있는 것보다 더 많은 기능을 갖춘 특수 메시지 대기열 미들웨어입니다.

RabbitMQ와 같은 메시지 미들웨어는 메시지를 보내기 전에 Exchange와 대기열을 생성한 다음 몇 가지 규칙을 통해 Exchange와 대기열을 바인딩해야 합니다. 메시지를 보낼 때 라우팅을 공식화해야 합니다. . -key, 헤더 메시지도 제어합니다. 이는 생산자에게만 해당됩니다. 소비자도 메시지를 소비하기 전에 위의 일련의 번거로운 단계를 다시 거쳐야 합니다.

따라서 100% 신뢰성을 요구하지 않고 간단한 메시지 대기열 요구 사항을 구현하려는 사람들을 위해 Redis를 사용하면 메시지 대기열 미들웨어의 번거로운 단계에서 벗어날 수 있습니다.

Redis의 메시지 큐는 전문적인 메시지 큐가 아니며 메시지 큐에 고급 기능이 많지도 않고 ack 보장도 없습니다. 메시지 신뢰성을 궁극적으로 추구한다면 전문 MQ 미들웨어를 선택하십시오. [관련 권장 사항: Redis 동영상 튜토리얼]

비동기 메시지 대기열

가장 간단한 비동기 메시지 대기열에서 시작하여 Redis 목록 데이터 구조는 일반적으로 비동기 메시지 대기열로 사용되며 lrpush/lpush를 통해 대기열에 추가됩니다. /lpop은 대기열에서 제거합니다.

Redis에서 메시지 대기열을 사용하는 방법에 대한 간략한 분석

문제 1: 빈 대기열

팝 작업의 경우 메시지 대기열이 비어 있으면 클라이언트가 팝의 무한 루프에 빠져 생명을 낭비하는 빈 폴링이 많이 발생하여 클라이언트가 CPU가 올라가고, Redis의 QPS도 올라갑니다.

위 문제에 대한 해결책은 목록 구조의 blpop/brpop을 사용하여 큐에서 빼는 것입니다. 여기서 b 접두사는 차단, 차단 읽기를 나타냅니다. 읽기 차단의 경우 대기열에 데이터가 없으면 절전 상태로 들어가고 데이터가 도착하자마자 깨어납니다. 위의 문제를 완벽하게 해결합니다.

문제 2: 유휴 연결이 끊어졌습니다

읽기 차단 솔루션은 완벽해 보이지만 즉시 또 다른 문제인 유휴 연결로 이어집니다. 스레드가 어딘가에서 계속 차단되면 Redis 클라이언트 연결은 유휴 연결이 됩니다. 유휴 시간이 너무 길면 Redis 서버는 유휴 리소스 사용량을 줄이기 위해 적극적으로 연결을 끊습니다. 이때 blpop/brpop은 예외를 발생시킵니다.

그러므로 클라이언트(애플리케이션) 소비자를 작성할 때 주의하고 예외 포착에 주의하고 재시도해야 합니다.

애플리케이션 1: 지연 대기열

Redis 분산 잠금에는 일반적으로 잠금 실패를 처리하는 세 가지 전략이 있습니다.

  • 예외를 직접 발생시키고 프런트 엔드에서 작업을 계속할지 여부를 사용자에게 알려줍니다.

  • 잠시 후 다시 시도하세요.
  • 요청을 지연 대기열에 넣고 잠시 후 다시 시도하세요.
  • Redis의 지연 대기열에 대해서는 zset(순서 목록) 데이터를 사용할 수 있습니다. 달성할 수 있는 구조. zse 값으로 메시지를 문자열로 직렬화하고, 메시지의 만료 처리 시간(지연 시간)을 점수로 직렬화합니다. 그런 다음 zset을 폴링하여 처리 만료 시간을 얻고, zrem을 통해 zset에서 키를 제거하여 성공적인 소비를 나타낸 다음 작업을 처리합니다.

핵심 코드는 다음과 같습니다.

// 生产\
public void delay(T msg) {\
  TaskItem task = new TaskItem();\
  task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\
  task.msg = msg;\
  String s = JSON.toJSONString(task); // fastjson 序列化\
  jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\
}\
// 消费\
public void loop() {\
  while (!Thread.interrupted()) {\
   // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\
   Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\
   if (values.isEmpty()) {\
     try {\
       Thread.sleep(500); // 歇会继续\
    }\
     catch (InterruptedException e) {\
       break;\
    }\
     continue;\
  }\
   String s = values.iterator().next();  //消费队列\
   if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\
     TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\
     this.handleMsg(task.msg);\
  }\
}\
}

위 코드는 동일한 작업이 여러 스레드에 의해 경쟁되는 다중 스레드 상황에서 사용됩니다. 작업이 여러 번 소비되는 것을 피하기 위해 zrem 이후에 작업을 처리할 수 있습니다. 그러나 작업을 획득했지만 성공적으로 사용하지 못한 스레드의 경우 작업을 획득하는 것은 시간 낭비였습니다. 따라서 lua 스크립팅을 통해 이 논리를 최적화하는 것을 고려할 수 있습니다. 원자적 작업을 위해 zrangeByScore와 zrem을 서버로 함께 이동하면 문제가 완벽하게 해결됩니다.

더 많은 프로그래밍 관련 지식을 보려면

프로그래밍 소개

를 방문하세요! !

위 내용은 Redis에서 메시지 대기열을 사용하는 방법에 대한 간략한 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제