Home  >  Article  >  PHP Framework  >  Swoole asynchronous programming practice: creating a high-performance queuing system

Swoole asynchronous programming practice: creating a high-performance queuing system

WBOY
WBOYOriginal
2023-06-13 09:11:531362browse

With the rapid development of Internet applications, more and more companies are beginning to use asynchronous programming to improve code performance and application efficiency. Swoole is a powerful asynchronous programming framework for PHP, with high performance, high concurrency and excellent scalability. In this article, we will introduce how to use Swoole to build a high-performance queuing system.

First of all, we need to understand what a queuing system is. The queuing system is a service overall scheduling system that improves the response speed of services and the concurrent processing capabilities of the system by queuing management and scheduling of various services. In practical applications, queuing systems are usually used to implement functions such as high concurrent access, asynchronous task scheduling, load balancing, etc. Therefore, their high performance and high availability are necessary.

Next, we will use the following requirements as an example to explain how to use Swoole to build a high-performance queuing system:

  1. Supports multiple queues and can manage queues ;
  2. Supports the addition and execution of tasks, and can manage the status of tasks;
  3. Supports multiple consumers to process tasks, and can manage consumers;
  4. Support task retry and timeout processing;
  5. Support asynchronous processing and synchronous processing of tasks.

Now, let’s get down to business and start using Swoole to build this high-performance queuing system.

1. Introducing Swoole

First of all, we need to introduce Swoole into the project. Here we can easily introduce Swoole dependencies through Composer.

composer require swoole/swoole

2. Build queue

In the queuing system, the queue is the core structure for storing tasks. We need to build a queue and add tasks to the queue. Here we use Redis as the queue storage method and use the PHP Redis extension to operate the queue.

  1. Create Redis connection

Before using Redis, we need to create a connection with Redis first. Here we create a Redis connection pool to manage Redis connections.

use SwooleCoroutineChannel;

class RedisPool
{

private $max;
private $pool;

public function __construct($max = 100)
{
    $this->max = $max;
    $this->pool = new Channel($max);
}

public function get($config)
{
    if (!$this->pool->isEmpty()) {
        return $this->pool->pop();
    }

    $redis = new Redis();
    $redis->connect($config['host'], $config['port']);
    $redis->select($config['db']);
    
    return $redis;
}

public function put($redis)
{
    if ($this->pool->length() < $this->max) {
        $this->pool->push($redis);
    } else {
        $redis->close();
    }
}

}

  1. Create Queue

Connect Next, we can create a queue class to manage queue operations, including task addition, task acquisition, and task deletion.

class Queue
{

private $redis;

public function __construct($config)
{
    $this->redis = (new RedisPool())->get($config);
}

public function push($queueName, $data)
{
    $this->redis->lpush($queueName, $data);
}

public function pop($queueName)
{
    return $this->redis->rpop($queueName);
}

public function del($queueName, $data)
{
    $this->redis->lrem($queueName, -1, $data);
}

}

3. Implement task execution

After adding a task to the queue, we need a task executor to perform tasks. Here we use coroutines to implement asynchronous execution of tasks, and use Worker processes to improve task execution efficiency.

  1. Create Worker process

In Swoole, we can use Worker process to implement multi-process processing tasks. Here we create a Worker process to handle the task.

$worker = new SwooleProcessWorker();

  1. Create a coroutine executor

Next, we can create a coroutine executor to handle Task. Here we use coroutines to implement asynchronous task execution, and use Golang-style coroutine pools to improve the efficiency of concurrent processing.

class CoroutineExecutor
{

private $pool;
private $redisConfig;

public function __construct($maxCoroutineNum, $redisConfig)
{
    $this->pool = new SwooleCoroutineChannel($maxCoroutineNum);
    $this->redisConfig = $redisConfig;

    for ($i = 0; $i < $maxCoroutineNum; $i++) {
        $this->pool->push(new Coroutine());
    }
}

public function execute($callback, $data)
{
    $coroutine = $this->pool->pop();
    $coroutine->execute($callback, $data, $this->redisConfig);
    $this->pool->push($coroutine);
}

}

  1. Creating a coroutine

Next, we can create a coroutine to perform tasks.

class Coroutine
{

private $redis;

public function __construct()
{
    $this->redis = null;
}

public function execute($callback, $data, $config)
{
    if (!$this->redis) {
        $this->redis = (new RedisPool())->get($config);
    }
    
    Coroutine::create(function () use ($callback, $data) {
        call_user_func($callback, $this->redis, $data);
    });
}

}

4. Create a service

Finally, we can use Swoole to create a service to provide queue query and Functionality added by tasks.

  1. Implement queue management

We can use Swoole's HTTP Server to implement service port monitoring and perform queue management through HTTP requests. Here we provide interfaces for list acquisition, task deletion and task addition.

  1. Realize task execution

We can use Swoole's TaskWorker process to implement task execution. By dispatching tasks to the TaskWorker process, the TaskWorker process executes the tasks asynchronously.

class Task
{

public function execute($worker, $workerId, $taskId, $taskData)
{
    $executor = new CoroutineExecutor(64, [
        'host' => '127.0.0.1',
        'port' => 6379,
        'db' => 0
    ]);
    $executor->execute($taskData['callback'], $taskData['data']);

    return true;
}

}

  1. Implement service startup

Finally, we can implement service startup and monitoring port, and start the TaskWorker process to perform the task.

$http = new SwooleHttpServer("127.0.0.1", 9501);
$http->on('start', function () {

echo "Server started

";
});

$http->on('request', function ($request, $response) {

$queue = new Queue([
    'host' => '127.0.0.1',
    'port' => 6379,
    'db' => 0
]);

switch ($request->server['request_uri']) {
    case '/queue/list':
        // 获取队列列表
        break;
    case '/queue/delete':
        // 删除任务
        break;
    case '/queue/add':
        $data = json_decode($request->rawContent(), true);
        $queue->push($data['queue'], $data['data']);
        $http->task([
            'callback' => function ($redis, $data) {
                // 任务执行逻辑
            },
            'data' => $data
        ]);
        break;
    default:
        $response->status(404);
        $response->end();
        break;
}

});

$http-> ;on('task', function ($http, $taskId, $workerId, $data) {

$task = new Task();
$result = $task->execute($http, $workerId, $taskId, $data);

return $result;

});

$http->on('finish', function ($http, $taskId, $data) {

// 任务执行完成逻辑

});

$http->start();

5. Summary

This article introduces how to use Swoole to implement a high-performance queuing system. Through Swoole's coroutines and Worker processes, we can achieve high-performance processing of asynchronous tasks, and achieve efficient task management and scheduling through the Redis storage structure. Such a queuing system can be widely used in functional scenarios such as asynchronous task scheduling, high concurrent access, load balancing, etc. It is a solution worthy of promotion and use.

The above is the detailed content of Swoole asynchronous programming practice: creating a high-performance queuing system. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn