Home  >  Article  >  Backend Development  >  Detailed explanation of how to implement redis queue priority code example

Detailed explanation of how to implement redis queue priority code example

伊谢尔伦
伊谢尔伦Original
2017-07-17 09:51:001833browse

How to use redis to make a message queue

First of all, redis is designed to be used for caching, but due to some of its own characteristics, it can be used for message queues. It has several blocking APIs that can be used. It is these blocking APIs that give it the ability to do message queues.

Just imagine that under the idea of ​​​​"database solves all problems", your needs can be completed without using message queues. We store all tasks in the database and then process them through continuous polling. Although this approach can complete your task, it is very crude. But if your database interface provides a blocking method, you can avoid polling operations. Your database can also be used as a message queue, but the current database does not have such an interface.

In addition, other features of the message queue such as FIFO are also easy to implement. You only need a List object to fetch data from the beginning and stuff data from the tail.

Redis can be used as a message queue thanks to its list object blpop brpop interface and some interfaces of Pub/Sub (publish/subscribe). They are all blocking versions, so they can be used as message queues.

Rabbitmq's Priority Practice

There are many mature message queue products, such as rabbitmq. It is relatively simple to use and has relatively rich functions. It is completely sufficient in general situations. But one very annoying thing is that it doesn't support priorities.

For example, in a task of sending emails, some privileged users hope that their emails can be sent out in a more timely manner, at least giving priority to them than ordinary users. By default, rabbitmq cannot handle it. The tasks thrown to rabbitmq are FIFO first in, first out. But we can use some workarounds to support these priorities. Create multiple queues and set corresponding routing rules for rabbitmq consumers.

For example, there is such a queue by default. We use list to simulate [task1, task2, task3], and consumers take turns to take out tasks one by one according to the FIFO principle and process them. If a high-priority task comes in, it can only be processed last [task1, task2, task3, higitask1]. But if two queues are used, a high-priority queue and a normal priority queue. Normal priority [task1, task2, task3], high priority [hightask1] Then we set the consumer's routing to let the consumer randomly fetch data from any queue.

And we can define a consumer that specializes in processing high-priority queues. It will not process data in low-priority queues when it is idle. This is similar to the VIP counter of a bank. Ordinary customers queue up to get a number at the bank. When a VIP comes, although he does not take out a ticket from the number-taking machine that is in front of ordinary members, he can still go directly to the VIP channel faster.

If you use rabbitmq to support priority message queues, just like the VIP members of the same bank as mentioned above, go through different channels. However, this method only uses relative priority and cannot achieve absolute priority control. For example, I hope that a certain high-priority task will be processed first than other ordinary tasks in an absolute sense. In this way, the above solution will not work. . Because the consumer of rabbitmq only knows to "randomly" pick the first data in a queue from the queue it cares about when it is free, and it cannot control which queue it takes first. Or more fine-grained priority control. Or there are more than 10 priorities set in your system. It is also difficult to achieve using rabbitmq in this way.

But if you use redis as a queue, the above requirements can be achieved.

Why message queue is needed

The introduction of message queue mechanism into the system is a very big improvement to the system. For example, in a web system, after the user performs a certain operation, an email notification needs to be sent to the user's mailbox. You can use the synchronous method to let the user wait for the mail to be sent before feedback to the user, but this may cause the user to wait for a long time due to network uncertainty and affect the user experience.

In some scenarios, it is impossible to wait for completion using the synchronous method, and those operations require a lot of time in the background. For example, in an extreme example, for an online compilation system task, it takes 30 minutes for background compilation to complete. The design of this scenario makes it impossible to wait synchronously and then give feedback. It must first feed back to the user and then the asynchronous processing is completed, and then wait for the processing to be completed before feeding back to the user according to the situation.

In addition, message queues are suitable for situations where the system processing capacity is limited. The queue mechanism is first used to temporarily store tasks, and the system then takes turns to process the queued tasks one by one. In this way, highly concurrent tasks can be stably processed even when the system throughput is insufficient.

The message queue can be used as a queuing mechanism. As long as the system needs to use the queuing mechanism, the message queue can be used.

Redis message queue priority implementation

Explanation of some basic redis basic knowledge

redis> blpop tasklist 0
"im task 01"

This example uses the blpop command to fetch the first data from the tasklist list in a blocking manner, and the last parameter is the waiting timeout. If set to 0, it means waiting indefinitely. In addition, the data stored in redis can only be of type string, so when transferring tasks, only string can be passed. We only need to simply serialize the responsible data into a string in json format, and then convert it on the consumer side.

Here our example language uses python, and the library linked to redis uses redis-py. If you have some programming skills, it should be no problem to switch it to your favorite language.

1. Simple FIFO queue

import redis, time
def handle(task):
    print task
    time.sleep(4)
 
def main():
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
    r = redis.Redis(connection_pool=pool)
    while 1:
        result = r.brpop('tasklist', 0)
        handle(result[1])
 
if name == "main":
    main()

The above example is even the simplest consumer. We continuously fetch data from the redis queue through an infinite loop. If there is no data in the queue, it will be blocked there without timeout. If there is data, it will be taken out and executed.

Generally, it will be a complex string taken out. We may need to format it and then pass it to the processing function, but for the sake of simplicity, our example is an ordinary string. In addition, the processing function in the example does not perform any processing, and is only used to sleep to simulate time-consuming operations.

We open another redis client to simulate the producer, and the built-in client is enough. Put more data into the tasklist queue.

redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> lpush tasklist 'im task 03'
redis> lpush tasklist 'im task 04'
redis> lpush tasklist 'im task 05'

Then on the consumer side, you will see these simulated tasks being consumed one by one.

2. Simple priority queue

Assume a simple requirement, only high-priority tasks need to be processed first than low-priority tasks. The order of other tasks does not matter. In this case, we only need to push it to the front of the queue when encountering a high-priority task, instead of pushing it to the back.

Because our queue uses a redis list, it is easy to implement. Use rpush when encountering high priority. Use lpush

redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> rpush tasklist 'im high task 01'
redis> rpush tasklist 'im high task 01'
redis> lpush tasklist 'im task 03'
redis> rpush tasklist 'im high task 03'

when encountering low priority. You will then see that high priority is always executed first than low priority. However, the disadvantage of this solution is that the execution order of high-priority tasks is first-in, last-out.

3. A more complete queue

In Example 2, high-priority tasks are simply put at the front of the queue, and low-priority tasks are put at the end. This does not guarantee the order between high-priority tasks.

Assuming that when all tasks are of high priority, their execution order will be reversed. This obviously violates the FIFO principle of the queue.

However, our queue can be improved with slight improvements.

Similar to using rabbitmq, we set up two queues, a high-priority queue and a low-priority queue. High-priority tasks are placed in the high-priority queue, and low-priority tasks are placed in the low-priority queue. The difference between redis and rabbitmq is that it can ask the queue consumer to read from which queue first.

def main():
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
    r = redis.Redis(connection_pool=pool)
    while 1:
        result = r.brpop(['high_task_queue', 'low_task_queue'], 0)
        handle(result[1])

The above code will fetch data from the two queues 'high_task_queue' and 'low_task_queue' blockingly. If the first one is not fetched from the second queue.

So we only need to make such improvements to the queue consumer to achieve the goal.

redis> lpush low_task_queue low001
redis> lpush low_task_queue low002
redis> lpush low_task_queue low003
redis> lpush low_task_queue low004
redis> lpush high_task_queue low001
redis> lpush high_task_queue low002
redis> lpush high_task_queue low003
redis> lpush high_task_queue low004

Through the above test, we can see that high priority will be executed first, and the FIFO principle is also guaranteed between high priorities.

With this solution, we can support priority queues at different stages, such as three levels, high, middle, low, or more.

4. Situation with many priority levels

Suppose there is such a requirement, and the priority is not a simple fixed level of high, medium, low or 0-10. But there are so many levels like 0-99999. Then our third option will not be suitable.

Although redis has a sortable data type such as sorted set, it is a pity that it does not have a blocking version of the interface. So we can only use the list type to achieve the purpose through other methods.

A simple way is to set up only one queue and ensure that it is sorted according to priority. Then use the binary search method to find the appropriate location for a task, and insert it into the corresponding location through the lset command.

For example, the queue contains tasks with write priority [1, 3, 6, 8, 9, 14]. When a task with priority 7 comes, we use our own binary algorithm to write the tasks one by one. Take the data out of the queue and compare it with the target data, calculate the corresponding position and then insert it into the designated location.

Because binary search is relatively fast, and redis itself is also in memory, theoretically the speed can be guaranteed. But if the amount of data is really large, we can also tune it in some ways.

Recall our third option, combining the third option will reduce the overhead to a great extent. For example, for queues with a data volume of 100,000, their priorities are also randomly in the range of 0-100,000. We can set up 10 or 100 different queues. Priority tasks from 0 to 10,000 are placed in queue 1, and tasks from 10,000 to 20,000 are placed in queue 2. In this way, after a queue is split into different levels, the data of a single queue will be reduced a lot, so the efficiency of binary search matching will be higher. However, the resources occupied by the data remain basically unchanged. One hundred thousand data should occupy the same amount of memory. It's just that there are more queues in the system.

The above is the detailed content of Detailed explanation of how to implement redis queue priority code example. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn