Home >Backend Development >PHP7 >Implementing coroutines in PHP7

Implementing coroutines in PHP7

Guanhui
Guanhuiforward
2020-05-02 11:08:203094browse

Preface

I believe everyone has heard of the concept of "coroutine".

But some students don’t understand this concept, and don’t know how to implement it, how to use it, and where to use it. Some people even think that yield is coroutine!

I always believe that if you cannot accurately express a knowledge point, I can think that you just don’t understand.

If you have learned about using PHP to implement coroutines before, you must have read Brother Niao’s article: Using coroutines to implement multi-task scheduling in PHP | Fengxuezhiyu

Brother Niao’s article was translated from a foreign author. The translation is concise and clear, and specific examples are also given.

The purpose of writing this article is to make a more complete supplement to Brother Niao's article. After all, some students' foundation is not good enough, and they are confused.

What is a coroutine

First, let’s figure out what a coroutine is.

You may have heard of the concepts of "process" and "thread".

A process is a running instance of a binary executable file in computer memory, just like your .exe file is a class, and the process is the new instance.

The process is the basic unit of resource allocation and scheduling in the computer system (don’t worry about thread processes in the scheduling unit). Each CPU can only process one process at the same time.

The so-called concurrency just looks like the CPU can handle several things at the same time. For a single-core CPU, it is actually switching between different processes very quickly.

Process switching requires a system call. The CPU needs to save various information about the current process, and the CPUCache will also be destroyed.

So if you can't switch the process, don't do it unless you have to.

So how to achieve "If you can't switch processes, don't do it unless you have to"?

First of all, the conditions for the process to be switched are: the process execution is completed, the CPU time slice allocated to the process ends, an interrupt occurs in the system that needs to be processed, or the process is waiting for necessary resources (process blocking), etc. If you think about it, there is nothing to say in the previous situations, but if you are blocking and waiting, is it a waste?

In fact, if it blocks, there are other executable places for our program to execute, so we don’t have to wait stupidly!

So there are threads.

A simple understanding of a thread is a "micro-process" that specifically runs a function (logical flow).

So we can use threads to embody functions that can run simultaneously during the process of writing programs.

There are two types of threads, one is managed and scheduled by the kernel.

We say that as long as the kernel needs to participate in management and scheduling, the cost is very high. This kind of thread actually solves the problem that when a thread being executed in a process encounters a blockage, we can schedule another runnable thread to run, but it is still in the same process, so there is no process switching.

There is another kind of thread whose scheduling is managed by programmers writing programs themselves and is invisible to the kernel. This kind of thread is called a "user space thread".

Coroutine can be understood as a kind of user space thread.

Coroutines have several characteristics:

Collaboration, because it is a scheduling strategy written by the programmer himself, it switches through collaboration instead of preemption

When the user State completion creation, switching and destruction

⚠️ From a programming perspective, the idea of ​​coroutine is essentially the active yield and resume mechanism of control flow

generator often Used to implement coroutines

Having said this, you should understand the basic concept of coroutines, right?

PHP Implementation of Coroutines

Step by step, starting from explaining the concept!

Iterable object

PHP5 provides a way to define an object so that it can be traversed through a list of cells, such as using a foreach statement.

If you want to implement an iterable object, you have to implement the Iterator interface:

<?php
class MyIterator implements Iterator
{
    private $var = array();
    public function __construct($array)
    {
        if (is_array($array)) {
            $this->var = $array;
        }
    }
    public function rewind() {
        echo "rewinding\n";
        reset($this->var);
    }
    public function current() {
        $var = current($this->var);
        echo "current: $var\n";
        return $var;
    }
    public function key() {
        $var = key($this->var);
        echo "key: $var\n";
        return $var;
    }
    public function next() {
        $var = next($this->var);
        echo "next: $var\n";
        return $var;
    }
    public function valid() {
        $var = $this->current() !== false;
        echo "valid: {$var}\n";
        return $var;
    }
}
$values = array(1,2,3);
$it = new MyIterator($values);
foreach ($it as $a => $b) {
    print "$a: $b\n";
}

Generator

It can be said that in order to have an iterator that can For objects traversed by foreach, you have to implement a bunch of methods. The yield keyword is to simplify this process.

Generators provide an easier way to implement simple object iteration. Compared with defining a class to implement the Iterator interface, the performance overhead and complexity are greatly reduced.

<?php
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}
 
foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}

Remember, if yield is used in a function, it is a generator. It is useless to call it directly. It cannot be executed like a function!

So, yield is yield. Next time whoever says yield is a coroutine, I will definitely treat you xxxx.

PHP Coroutine

As mentioned before when introducing coroutines, coroutines require programmers to write the scheduling mechanism themselves. Let’s take a look at how to write this mechanism.

Correct use of generators

Since the generator cannot be called directly like a function, how can it be called?

The method is as follows:

foreach him

send($value)

current / next...

Task Implementing

Task is the abstraction of a task. Just now we said that a coroutine is a user space thread. A thread can be understood as running a function.

So the Task constructor receives a closure function, which we name coroutine.

/**
 * Task任务类
 */
class Task
{
    protected $taskId;
    protected $coroutine;
    protected $beforeFirstYield = true;
    protected $sendValue;
    /**
     * Task constructor.
     * @param $taskId
     * @param Generator $coroutine
     */
    public function __construct($taskId, Generator $coroutine)
    {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }
    /**
     * 获取当前的Task的ID
     * 
     * @return mixed
     */
    public function getTaskId()
    {
        return $this->taskId;
    }
    /**
     * 判断Task执行完毕了没有
     * 
     * @return bool
     */
    public function isFinished()
    {
        return !$this->coroutine->valid();
    }
    /**
     * 设置下次要传给协程的值,比如 $id = (yield $xxxx),这个值就给了$id了
     * 
     * @param $value
     */
    public function setSendValue($value)
    {
        $this->sendValue = $value;
    }
    /**
     * 运行任务
     * 
     * @return mixed
     */
    public function run()
    {
        // 这里要注意,生成器的开始会reset,所以第一个值要用current获取
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            // 我们说过了,用send去调用一个生成器
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
}

Scheduler实现

接下来就是Scheduler这个重点核心部分,他扮演着调度员的角色。

/**
 * Class Scheduler
 */
Class Scheduler
{
    /**
     * @var SplQueue
     */
    protected $taskQueue;
    /**
     * @var int
     */
    protected $tid = 0;
    /**
     * Scheduler constructor.
     */
    public function __construct()
    {
        /* 原理就是维护了一个队列,
         * 前面说过,从编程角度上看,协程的思想本质上就是控制流的主动让出(yield)和恢复(resume)机制
         * */
        $this->taskQueue = new SplQueue();
    }
    /**
     * 增加一个任务
     *
     * @param Generator $task
     * @return int
     */
    public function addTask(Generator $task)
    {
        $tid = $this->tid;
        $task = new Task($tid, $task);
        $this->taskQueue->enqueue($task);
        $this->tid++;
        return $tid;
    }
    /**
     * 把任务进入队列
     *
     * @param Task $task
     */
    public function schedule(Task $task)
    {
        $this->taskQueue->enqueue($task);
    }
    /**
     * 运行调度器
     */
    public function run()
    {
        while (!$this->taskQueue->isEmpty()) {
            // 任务出队
            $task = $this->taskQueue->dequeue();
            $res = $task->run(); // 运行任务直到 yield
            if (!$task->isFinished()) {
                $this->schedule($task); // 任务如果还没完全执行完毕,入队等下次执行
            }
        }
    }
}

这样我们基本就实现了一个协程调度器。

你可以使用下面的代码来测试:

<?php
function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield; // 主动让出CPU的执行权
    }
}
 
function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield; // 主动让出CPU的执行权
    }
}
 
$scheduler = new Scheduler; // 实例化一个调度器
$scheduler->addTask(task1()); // 添加不同的闭包函数作为任务
$scheduler->addTask(task2());
$scheduler->run();

关键说下在哪里能用得到PHP协程。

function task1() {
        /* 这里有一个远程任务,需要耗时10s,可能是一个远程机器抓取分析远程网址的任务,我们只要提交最后去远程机器拿结果就行了 */
        remote_task_commit();
        // 这时候请求发出后,我们不要在这里等,主动让出CPU的执行权给task2运行,他不依赖这个结果
        yield;
        yield (remote_task_receive());
        ...
}
 
function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield; // 主动让出CPU的执行权
    }
}

这样就提高了程序的执行效率。

关于『系统调用』的实现,鸟哥已经讲得很明白,我这里不再说明。

协程堆栈

鸟哥文中还有一个协程堆栈的例子。

我们上面说过了,如果在函数中使用了yield,就不能当做函数使用。

所以你在一个协程函数中嵌套另外一个协程函数:

<?php
function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}
 
function task() {
    echoTimes(&#39;foo&#39;, 10); // print foo ten times
    echo "---\n";
    echoTimes(&#39;bar&#39;, 5); // print bar five times
    yield; // force it to be a coroutine
}
 
$scheduler = new Scheduler;
$scheduler->addTask(task());
$scheduler->run();

这里的echoTimes是执行不了的!所以就需要协程堆栈。

不过没关系,我们改一改我们刚刚的代码。

把Task中的初始化方法改下,因为我们在运行一个Task的时候,我们要分析出他包含了哪些子协程,然后将子协程用一个堆栈保存。(C语言学的好的同学自然能理解这里,不理解的同学我建议去了解下进程的内存模型是怎么处理函数调用)

 /**
     * Task constructor.
     * @param $taskId
     * @param Generator $coroutine
     */
    public function __construct($taskId, Generator $coroutine)
    {
        $this->taskId = $taskId;
        // $this->coroutine = $coroutine;
        // 换成这个,实际Task->run的就是stackedCoroutine这个函数,不是$coroutine保存的闭包函数了
        $this->coroutine = stackedCoroutine($coroutine); 
    }

当Task->run()的时候,一个循环来分析:

/**
 * @param Generator $gen
 */
function stackedCoroutine(Generator $gen)
{
    $stack = new SplStack;
    // 不断遍历这个传进来的生成器
    for (; ;) {
        // $gen可以理解为指向当前运行的协程闭包函数(生成器)
        $value = $gen->current(); // 获取中断点,也就是yield出来的值
        if ($value instanceof Generator) {
            // 如果是也是一个生成器,这就是子协程了,把当前运行的协程入栈保存
            $stack->push($gen);
            $gen = $value; // 把子协程函数给gen,继续执行,注意接下来就是执行子协程的流程了
            continue;
        }
        // 我们对子协程返回的结果做了封装,下面讲
        $isReturnValue = $value instanceof CoroutineReturnValue; // 子协程返回`$value`需要主协程帮忙处理
        
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }
            // 如果是gen已经执行完毕,或者遇到子协程需要返回值给主协程去处理
            $gen = $stack->pop(); //出栈,得到之前入栈保存的主协程
            $gen->send($isReturnValue ? $value->getValue() : NULL); // 调用主协程处理子协程的输出值
            continue;
        }
        $gen->send(yield $gen->key() => $value); // 继续执行子协程
    }
}

然后我们增加echoTime的结束标示:

class CoroutineReturnValue {
    protected $value;
 
    public function __construct($value) {
        $this->value = $value;
    }
     
    // 获取能把子协程的输出值给主协程,作为主协程的send参数
    public function getValue() {
        return $this->value;
    }
}
function retval($value) {
    return new CoroutineReturnValue($value);
}

然后修改echoTimes:

function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
    yield retval("");  // 增加这个作为结束标示
}

Task变为:

function task1()
{
    yield echoTimes(&#39;bar&#39;, 5);
}

这样就实现了一个协程堆栈,现在你可以举一反三了。

PHP7中yield from关键字

PHP7中增加了yield from,所以我们不需要自己实现携程堆栈,真是太好了。

把Task的构造函数改回去:

    public function __construct($taskId, Generator $coroutine)
    {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
        // $this->coroutine = stackedCoroutine($coroutine); //不需要自己实现了,改回之前的
    }

echoTimes函数:

function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}

task1生成器:

function task1()
{
    yield from echoTimes(&#39;bar&#39;, 5);
}

这样,轻松调用子协程。

建议不要使用PHP的Yield来实现协程,推荐使用swoole,2.0已经支持了协程,并附带了部分案例。

推荐教程:《PHP7教程

The above is the detailed content of Implementing coroutines in PHP7. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:segmentfault.com. If there is any infringement, please contact admin@php.cn delete