How to implement delay queue in Redis

2023-05-26

    Redis implements delay queue

    Redis delay queue

    Redis implements delayed message queue through ordered set (ZSet) Yes, ZSet has a Score property that can be used to store the delayed execution time.

    But it requires an infinite loop to check the task, which will consume system resources

    class RedisDelayQueue(object):
        """Simple Queue with Redis Backend
        dq = RedisDelayQueue('delay:commtrans')
        dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)})
        def __init__(self, name, namespace='queue'):
            """The default connection parameters are: host='localhost', port=6379, db=0"""
            self.__db = get_redis_engine(database_name='spdb')
            self.key = '%s:%s' % (namespace, name)
        def qsize(self):
            """Return the approximate size of the queue."""
            return self.__db.zcard(self.key)
        def empty(self):
            """Return True if the queue is empty, False otherwise."""
            return self.qsize() == 0
        def rem(self, value):
            return self.__db.zrem(self.key, value)
        def get(self):
            # 获取任务,以0和当前时间为区间,返回一条在当前区间的记录
            items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
            if items:
                item = items[0]
                if self.rem(item):  # 解决并发问题  如能删就让谁取走
                    return json.loads(item)
            return None
        def put(self, interval, item):
            """:param interval 延时秒数"""
            # 以时间作为score,对任务队列按时间戳从小到大排序
            """Put item into the queue."""
            d = json.dumps(item)
            return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})

    Optimization plan for Redis to implement delay queue

    Application of delay queue

    Recent In the new project of the development department, one of the key functions is intelligent push, which is to push corresponding reminder messages to users at specific points in time based on user behavior, such as the following business scenarios:

    • ## After the user clicks on the recharge item and does not recharge within half an hour, a reminder of uncompleted recharge will be pushed to the user.

    • Push a continue reading reminder to the user 2 hours after the user’s last reading behavior.

    • N minutes after the user newly registers or exits the app, push appropriate recommendation messages to the user.

    The common feature of the above scenarios is to delay a certain period of time after an event is triggered before executing a specific task. If the event triggers It can be seen that the above logic can also be equivalent to executing a specific task at a specified time point (the event trigger time point delay time length).

    Delay queues are generally used to achieve this kind of requirements. The delay messages created need to contain information such as task delay time or task execution time point. When the task meets the time conditions and needs to be executed, the message will be Consumption, that is to say, you can specify the time point at which the messages in the queue are consumed.

    Implementation of delay queue

    In a stand-alone environment,

    JDK already comes with many components that can implement the delay queue function, such as DelayQueue, Timer, ScheduledExecutorService and other components can easily create delayed tasks. However, the use of the above components generally requires the task to be stored in memory. There is a risk of task loss when the service restarts. Moreover, the size of the task is limited by memory, which also results in long-term memory usage and is inflexible. It is usually suitable for single-process client programs or projects that do not have high task requirements.

    In a distributed environment, only using

    JDK's own components cannot reliably and efficiently implement delay queues. It is usually necessary to introduce third-party middleware or frameworks.

    For example, the common classic task scheduling framework

    Quartz or other frameworks based on this framework xxl-job. The main function of these frameworks is to implement scheduled tasks or periodic tasks Tasks, when Redis and RabbitMQ were not widely used, for example, common functions such as timeout and unpaid order cancellation were implemented by scheduled tasks, and regular polling was used to determine whether the order has been processed. The time point that triggers execution is reached.

    However, since scheduled tasks require a certain periodicity, the interval between periodic scans is difficult to control. If it is too short, it will cause a lot of meaningless scans and increase system pressure. If it is too long, it will cause excessive execution time errors. Large, and may cause the number of stacked records processed in a single scan to be too large.

    In addition, it is also a common way to use

    MQ to create a delay queue, such as implementing messages through RabbitMQ's TTL and dead letter queue Delayed delivery, considering that the delivered MQ messages cannot be easily deleted or modified, that is, the cancellation of the task or the change of the task execution time point cannot be realized, and the message cannot be easily deduplicated. Therefore, we did not choose to use MQ to implement the delay queue in the project. The data structure of

    Rediszset can also achieve the effect of delayed queue, and is more flexible and can achieve some things that MQ cannot do Features, so the project finally uses Redis to implement the delay queue, and optimizes and encapsulates it.

    The implementation principle is to use the

    score attribute of zset, redis will make the elements in the zset collection according to score Sort from small to large, and add elements to zset through the zadd command, as shown in the following command, where the value value is the delay For task messages, the message format can be defined according to the business. The score value is the time point of task execution, such as a 13-digit millisecond timestamp.

    zadd delayqueue 1614608094000 taskinfo

    After the task is added, the logic to obtain the task only needs to filter the elements whose

    score value is less than the current timestamp from zset, and the result is the current time node The tasks that need to be performed are obtained through the zrangebyscore command, as shown in the following command, where timestamp is the current timestamp, and limit can be used to limit the number of pulls each time Number of records to prevent the number of records obtained at a time from being too large.

    zrangebyscore delayqueue 0 timestamp limit 0 1000




    How to implement delay queue in Redis



    @XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue")
    public void scanBusiness1() {
    	// 某业务逻辑的zset延迟队列对应的key
    	String zsetKey = "delayqueue:business1";
    	while (true) {
    		// 筛选score值小于当前时间戳的元素,一次最多拉取1000条
    		Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000);
    		if (CollectionUtils.isEmpty(tasks)) {
    			// 当前时间下已没有需要执行的任务,结束本次扫描
    		for (String task : tasks) {
    			// 先删除,再执行,确保多线程环境下执行的唯一性
    			Boolean delete = stringRedisTemplate.delete(task);
    			if (delete) {
    				// 删除成功后,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦
    				rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task);












    @Scheduled(cron = "0/5 * * * * ?")
    public void scanBusiness1() {
    	// 队列数量M,考虑动态配置,保持和机器数量基本一致
    	int M = 10;
    	// redis自增key,用于负载均衡
    	String incrKey = "incrkey:delayqueue:business1";
    	// 每台机器执行时,从不同的zset中拉取消息,尽量确保不同机器访问不同zset
    	String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M);
    	while (true) {
    		// 此处逻辑和前面代码一致,省略。。。






    local zsetKey = &#39;delayqueue&#39;
    local timestamp = 1614608094000
    local items = redis.call(&#39;zrangebyscore&#39;,zsetKey,0,timestamp,&#39;limit&#39;,0,1)
    if #items == 0 then
        return &#39;&#39;
        return items[1]


     * 基于ZSET实现消息延迟处理,score存储执行时间点,到达时间点即会向指定队列发送该消息;
     * 定义一个继承本类的bean即可;
    public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean {
    	private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT;
    	static {
    		// 获取并删除的lua脚本,使用spring提供的api
    		String sb = "local items = redis.call(&#39;zrangebyscore&#39;,KEYS[1],0,ARGV[1],&#39;limit&#39;,0,1)\n" +
    				"if #items == 0 then\n" +
    				"\treturn &#39;&#39;\n" +
    				"else\n" +
    				"\tredis.call(&#39;zremrangebyrank&#39;,KEYS[1],0,0)\n" +
    				"\treturn items[1]\n" +
    		// 自有工具类,只要能创建出spring包下的 RedisScript 的实现类对象均可
    		TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class);
    	private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(),
    			0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix()));
    	private volatile boolean quit = false;
    	private StringRedisTemplate stringRedisTemplate;
    	private RabbitTemplate rabbitTemplate;
    	public void startScan() {
    		// bean构建完成后,启动若干执行线程
    		int threadNum = getThreadNum();
    		for (int i = 0; i < threadNum; i++) {
    	public void run() {
    		while (!quit) {
    			try {
    				// 循环,采用lua获取当前需要执行的任务并将其从redis中删除
    				String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT,
    						Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis()));
    				if (StringUtils.isNotBlank(msg)) {
    					// 消息不为空,表示获取任务成功,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦
    					rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg);
    				} else {
    					// 获取不到任务,表示当前时间点下暂无需要执行的任务,则线程休眠1S(可视情况调整)
    			} catch (Exception e) {
    				Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e);
    	public void destroy() throws Exception {
    		quit = true;
    	public void setQuit(boolean quit) {
    		this.quit = quit;
    	 * 获取消息的工作线程数量
    	protected abstract int getThreadNum();
    	 * 线程名称前缀,方便问题定位
    	protected abstract String getThreadNamePrefix();
    	 * 存放延迟消息的ZSET队列名
    	protected abstract String getDelayedMsgSourceKey();
    	 * 消息到达执行时间点时将其通过指定 exchange 发送到实时消费队列中
    	protected abstract String getSendExchange();
    	 * 消息到达执行时间点时将其通过指定 routingKey 发送到实时消费队列中
    	protected abstract String getSendRoutingKey();





