• 技术文章 >数据库 >Redis

    浅析Redis中怎么使用消息队列

    青灯夜游青灯夜游2022-01-19 09:32:36转载407
    本篇文章带大家了解一下Redis进阶用法-消息队列,介绍一下Redis中的延时队列,希望对大家有所帮助!

    说到消息队列中间件,我们都会想到RabbitMQ、RocketMQ和Kafka,来给应用实现异步消息传递的功能。这些都是专业的消息队列中间件,其特性之多超出了我们的理解能力。

    而这些消息中间件使用起来的是复杂的,例如RabbitMQ,发消息之前要创建Exchange,还要创建Queue,然后将Exchange和Queue通过某种规则绑定起来,发送消息的时候还要制定routing-key,还要 控制头消息。这仅是生产者,消费者在消费消息之前也要将上面一系列的繁琐步骤再操作一遍。

    那么对于那些并不要求百分百可靠,并且希望实现简单的消息队列需求时,我们可以通过Redis将我们从消息队列的中间件的繁琐步骤中解脱出来。

    Redis的消息队列不是专业的消息队列,他并没有消息队列中许多的高级特性,也没有ack保证。如果对消息的可靠性有着极致的追求,请转向专业的MQ中间件。【相关推荐:Redis视频教程

    异步消息队列

    从最简单的异步消息队列开始,Redis的list数据结构常用来作为异步消息队列,通过lrpush/lpush来操作入列,通过rpop/lpop来出列。

    1.png

    问题一:空队列

    对于pop操作来说,当消息队列空了的时候,客户端会陷入pop的死循环,造成大量的浪费生命的空轮询,导致客户端CPU拉高,同时Redis的QPS也被拉高。

    对于以上问题的解决办法就是通过list结构的blpop/brpop来操作出列,其中b前缀代表的就是blocking,阻塞读。对于阻塞读在队列没有数据的时候就会进入休眠状态,一旦数据到来就会立刻醒来。完美的解决了上面这个问题。

    问题二:空闲连接断开

    阻塞读的方案看似完美,紧接着引出了另外一个问题:空闲连接。 如果线程一直阻塞在哪哪里,Redis的客户端连接就变成了空闲连接。空闲时间过长,Redis服务器就会主动断开连接,以减少闲置资源占用。这时候blpop/brpop就会抛出异常来。

    所以,我们在编写客户端(应用程序)消费者的时候需要小心,注意捕获异常,并进行重试。

    应用一:延时队列

    在Redis的分布式锁中一般有三种策略来处理加锁失败的情况:

    而Redis中延时队列,我们可以通过zset(有序列表)数据结构来实现。我们将消息序列化作为一个字符串作为zse的value,而消息的到期处理时间(延时时间)作为score。然后通过轮询zset获取到期时间进行处理,通过zrem将key从zset移除代表成功消费,进而处理任务。

    核心代码如下:

    // 生产\
    public void delay(T msg) {\
      TaskItem task = new TaskItem();\
      task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\
      task.msg = msg;\
      String s = JSON.toJSONString(task); // fastjson 序列化\
      jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\
    }\
    // 消费\
    public void loop() {\
      while (!Thread.interrupted()) {\
       // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\
       Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\
       if (values.isEmpty()) {\
         try {\
           Thread.sleep(500); // 歇会继续\
        }\
         catch (InterruptedException e) {\
           break;\
        }\
         continue;\
      }\
       String s = values.iterator().next();  //消费队列\
       if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\
         TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\
         this.handleMsg(task.msg);\
      }\
    }\
    }

    以上的代码在多线程中对于同一个任务被多个线程争抢的情况,虽然能够通过zrem后在处理任务来避免一个任务被多次消费的情况。但是对于那些获取到了任务但是没有成功消费的线程来说,都是白白浪费时间取了一次任务。所以可以考虑通过lua scripting来优化这个逻辑。将zrangeByScore和zrem一同挪到服务器进行原子操作,就能够完美解决了。

    更多编程相关知识,请访问:编程入门!!

    以上就是浅析Redis中怎么使用消息队列的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:掘金社区,如有侵犯,请联系admin@php.cn删除
    专题推荐:Redis 消息队列
    上一篇:一文了解Redis中的哨兵模式 下一篇:深入了解Redis中的Codis
    VIP课程(WEB全栈开发)

    相关文章推荐

    • 【腾讯云】年中优惠,「专享618元」优惠券!• 深入浅析Redis中的布隆过滤器(Bloom Filter)• 聊聊Redis中AOF的潜在阻塞点(总结)• 聊聊Redis中的GEO地理位置模块• 详细解析Redis中的持久化机制• php7 yum安装redis的方法• 一文聊聊Redis中的限流策略
    1/1

    PHP中文网