ホームページ  >  記事  >  バックエンド開発  >  SwooleとSwoftのSwoftソースコードの解析

SwooleとSwoftのSwoftソースコードの解析

不言
不言オリジナル
2018-07-23 10:28:036017ブラウズ

この記事の内容は、Swoole と Swoft のソースコード解析 (タスク配信/スケジュールされたタスク) について、Swoole と Swoft の概要を紹介するものであり、一定の参考価値があり、困っている友人が参照することができます。

はじめに

Swoft のタスク関数は、Swooleタスク メカニズム、または Swoft に基づいています。 Task メカニズムの本質は、SwooleTask メカニズムのカプセル化と拡張です。

タスク配信

//Swoft\Task\Task.php
class Task
{
    /**
     * Deliver coroutine or async task
     *
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     *
     * @return bool|array
     * @throws TaskException
     */
    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
    {
        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

        if(!App::isWorkerStatus() && !App::isCoContext()){
            return self::deliverByQueue($data);//见下文Command章节
        }

        if(!App::isWorkerStatus() && App::isCoContext()){
            throw new TaskException('Please deliver task by http!');
        }


        $server = App::$server->getServer();
        // Delier coroutine task
        if ($type == self::TYPE_CO) {
            $tasks[0]  = $data;
            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

            App::profileStart($prifleKey);
            $result = $server->taskCo($tasks, $timeout);
            App::profileEnd($prifleKey);

            return $result;
        }

        // Deliver async task
        return $server->task($data);
    }
}

タスク配信Task::deliver()呼び出しパラメータをパッケージ化し、$typeパラメータに従って渡しますSwoole#$server->taskCo() または $server->task() インターフェイスは Task プロセス に配信されます。
Task 自体は常に同期的に実行されます。$type は配信操作の動作にのみ影響します。Task::TYPE_ASYNC$ サーバーに対応します->task() は非同期配信です。Task::deliver() は呼び出された直後に戻ります。Task::TYPE_CO$server-> に対応します。 ;taskCo() はコルーチンの配信です。配信後、コルーチンのコントロールは放棄されます。Task::deliver() は、タスクが完了するか実行がタイムアウトするまでコルーチンから戻りません。 。

タスク実行

//Swoft\Task\Bootstrap\Listeners\TaskEventListener 
/**
 * The listener of swoole task
 * @SwooleListener({
 *     SwooleEvent::ON_TASK,
 *     SwooleEvent::ON_FINISH,
 * })
 */
class TaskEventListener implements TaskInterface, FinishInterface
{
    /**
     * @param \Swoole\Server $server
     * @param int            $taskId
     * @param int            $workerId
     * @param mixed          $data
     * @return mixed
     * @throws \InvalidArgumentException
     */
    public function onTask(Server $server, int $taskId, int $workerId, $data)
    {
        try {
            /* @var TaskExecutor $taskExecutor*/
            $taskExecutor = App::getBean(TaskExecutor::class);
            $result = $taskExecutor->run($data);
        } catch (\Throwable $throwable) {
            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
            $result = false;

            // Release system resources
            App::trigger(AppEvent::RESOURCE_RELEASE);

            App::trigger(TaskEvent::AFTER_TASK);
        }
        return $result;
    }
}

これは

swoole.onTask のイベント コールバックです。その役割は、配信された Worker プロセスをパッケージ化することだけです。データは次のとおりです。 TaskExecutorに転送されます。

SwooleTask メカニズムの本質は、Worker プロセス が時間のかかるタスクを同期された Task プロセスに配信することです。 (TaskWorker とも呼ばれます) 処理のため、swoole.onTask のイベント コールバックが Task プロセス で実行されます。前述したように、ワーカー プロセスは、HTTP サービス コードのほとんどが実行される環境ですが、TaskEventListener.onTask() メソッドから始まり、コードの実行環境 これらはすべて Task プロセス です。つまり、TaskExecutor と特定の TaskBeanTask プロセス ## で実行されます。 #。 <pre class="brush:php;toolbar:false">//Swoft\Task\TaskExecutor /**  * The task executor  *  * @Bean()  */ class TaskExecutor {     /**      * @param string $data      * @return mixed     */     public function run(string $data)     {         $data = TaskHelper::unpack($data);         $name   = $data['name'];         $type   = $data['type'];         $method = $data['method'];         $params = $data['params'];         $logid  = $data['logid'] ?? uniqid('', true);         $spanid = $data['spanid'] ?? 0;         $collector = TaskCollector::getCollector();         if (!isset($collector['task'][$name])) {             return false;         }         list(, $coroutine) = $collector['task'][$name];         $task = App::getBean($name);         if ($coroutine) {             $result = $this-&gt;runCoTask($task, $method, $params, $logid, $spanid, $name, $type);         } else {             $result = $this-&gt;runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);         }         return $result;     } }</pre>タスク実行のアイデアは非常にシンプルです。

ワーカー プロセス

によって送信されたデータを解凍し、元の呼び出しパラメーターに復元し、 に基づいて対応する ## を見つけます。 $name パラメータ.#TaskBean を指定し、対応する task() メソッドを呼び出します。このうち、TaskBean は、クラスレベルのアノテーション @Task(name="TaskName") または @Task("TaskName") を使用して宣言されます。 @Task

アノテーションは

name 属性を削除し、coroutine 属性も存在することに注意してください。上記のコードはTask を実行するには、コルーチンの runCoTask() または同期の runSyncTask() を使用することを選択します。ただし、Swooleタスク プロセス の実行は完全に同期であり、コルーチンをサポートしていないため、現在のバージョンではこのパラメーターを true に設定しないでください。 。同様に、TaskBean で記述されたタスク コードは、同期ブロッキングであるか、環境に応じて非同期ノンブロッキングとコルーチンを同期ブロッキングに自動的にダウングレードできる必要があります。 プロセスからタスクを配信します。

前述しました:

Swoole

Task メカニズムの本質は、Worker プロセスが時間のかかるタスクを実行することです。タスクを同期 Task Process (別名 TaskWorker) 処理に変換します。 つまり、Swoole

$server->taskCo() または $server->task() は、 ワーカープロセスで使用されます。 この制限により、使用シナリオが大幅に制限されます。 プロセスでタスクを配信するにはどうすればよいですか?
Swoftこの制限を回避するために、Task::deliverByProcess() メソッドが提供されています。実装原理も非常にシンプルで、呼び出し情報は Swoole の $server->sendMessage() メソッドを通じて Process から Worker プロセスに配信されます。 、その後、ワーカー プロセスがそれを タスク プロセス に配信します。関連するコードは次のとおりです:

//Swoft\Task\Task.php
/**
 * Deliver task by process
 *
 * @param string $taskName
 * @param string $methodName
 * @param array  $params
 * @param string $type
 * @param int    $timeout
 * @param int    $workId
 *
 * @return bool
 */
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
    /* @var PipeMessageInterface $pipeMessage */
    $server      = App::$server->getServer();
    $pipeMessage = App::getBean(PipeMessage::class);
    $data = [
        'name'    => $taskName,
        'method'  => $methodName,
        'params'  => $params,
        'timeout' => $timeout,
        'type'    => $type,
    ];

    $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
    return $server->sendMessage($message, $workId);
}
データがパッケージ化された後、$ を使用しますserver->sendMessage()

Worker:

//Swoft\Bootstrap\Server\ServerTrait.php
/**
 * onPipeMessage event callback
 *
 * @param \Swoole\Server $server
 * @param int            $srcWorkerId
 * @param string         $message
 * @return void
 * @throws \InvalidArgumentException
 */
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
    /* @var PipeMessageInterface $pipeMessage */
    $pipeMessage = App::getBean(PipeMessage::class);
    list($type, $data) = $pipeMessage->unpack($message);

    App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}
$server->sendMessage に配信した後、

Worker プロセス データを受信すると ## がトリガーされます。#swoole.pipeMessage イベント コールバック、Swoft はそれを独自の swoft.pipeMessage イベントとトリガーに変換します。 <pre class="brush:php;toolbar:false">//Swoft\Task\Event\Listeners\PipeMessageListener.php /**  * The pipe message listener  *  * @Listener(event=AppEvent::PIPE_MESSAGE)  */ class PipeMessageListener implements EventHandlerInterface {     /**      * @param \Swoft\Event\EventInterface $event      */     public function handle(EventInterface $event)     {         $params = $event-&gt;getParams();         if (count($params) </pre>swoft.pipeMessage イベントは最終的に

PipeMessageListener

によって処理されます。関連する監視において、swoft.pipeMessage イベントが Task::deliverByProcess() によって生成されたことが判明した場合、Worker プロセス はそれを 1 回実行します。 Task::deliver() は、最終的にタスク データを TaskWorker プロセス に配信します。 簡単な復習演習: Task::deliverByProcess() から特定の

TaskBean

の最終実行まで、どのようなプロセスが実行され、どの部分が実行されたのかを確認します。呼び出しチェーンはどのプロセスで実行されますか? <h3>从Command进程或其子进程中投递任务</h3> <pre class="brush:php;toolbar:false">//Swoft\Task\QueueTask.php /**  * @param string $data  * @param int    $taskWorkerId  * @param int    $srcWorkerId  *  * @return bool  */ public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null) {     if ($taskWorkerId === null) {         $taskWorkerId = mt_rand($this-&gt;workerNum + 1, $this-&gt;workerNum + $this-&gt;taskNum);     }     if ($srcWorkerId === null) {         $srcWorkerId = mt_rand(0, $this-&gt;workerNum - 1);     }     $this-&gt;check();     $data   = $this-&gt;pack($data, $srcWorkerId);     $result = \msg_send($this-&gt;queueId, $taskWorkerId, $data, false);     if (!$result) {         return false;     }     return true; }</pre> <p>对于<code>Command进程的任务投递,情况会更复杂一点。
上文提到的Process,其往往衍生于Http/Rpc服务,作为同一个Manager的子孙进程,他们能够拿到Swoole\Server的句柄变量,从而通过$server->sendMessage(),$server->task()等方法进行任务投递。

但在Swoft的体系中,还有一个十分路人的角色: Command
Command的进程从shellcronb独立启动,和Http/Rpc服务相关的进程没有亲缘关系。因此Command进程以及从Command中启动的Process进程是没有办法拿到Swoole\Server的调用句柄直接通过UnixSocket进行任务投递的。
为了为这种进程提供任务投递支持,Swoft利用了SwooleTask进程的一个特殊功能----消息队列

SwooleとSwoftのSwoftソースコードの解析

同一个项目中CommandHttp\RpcServer 通过约定一个message_queue_key获取到系统内核中的同一条消息队列,然后Comand进程就可以通过该消息队列向Task进程投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()方法中,Swoft会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore拓展,如果你想使用,需要在编译PHP时加上--enable-sysvmsg参数。

定时任务

除了手动执行的普通任务,Swoft还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab功能.

Swoft用两个前置Process---任务计划进程:CronTimerProcess和任务执行进程CronExecProcess
,和两张内存数据表-----RunTimeTable(任务(配置)表)OriginTable((任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:

\\Swoft\Task\Crontab\TableCrontab.php
/**
 * 任务表,记录用户配置的任务信息
 * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
 * @var array $originStruct 
 */
private $originStruct = [
    'rule'       => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
    'add_time'   => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
];

/**
 * 执行表,记录短时间内要执行的任务列表及其执行状态
 * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
 * @var array $runTimeStruct 
 */
private $runTimeStruct = [
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//同上
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
    'minute'      => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
    'sec'        => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
    'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行)  1(已执行)  2(执行中) 三种。 
    //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
];

此处为何要使用Swoole的内存Table?

Swoft的的定时任务管理是分别由 任务计划进程任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table(本文的Table,除非特别说明,都指SwooleSwoole\Table结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。

为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存。具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比较简单,有兴趣的可以自行阅读。

背景介绍完了,我们来看看这两个定时任务进程的行为

//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
 * Crontab timer process
 *
 * @Process(name="cronTimer", boot=true)
 */
class CronTimerProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        //code....
        /* @var \Swoft\Task\Crontab\Crontab $cron*/
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $time = (60 - date('s')) * 1000;
        $server->after($time, function () use ($server, $cron) {
            // Every minute check all tasks, and prepare the tasks that next execution point needs
            $cron->checkTask();
            $server->tick(60 * 1000, function () use ($cron) {
                $cron->checkTask();
            });
        });
    }
}
//Swoft\Task\Crontab\Crontab.php
/**
 * 初始化runTimeTable数据
 *
 * @param array $task        任务
 * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
 * @return bool
 */
private function initRunTimeTableData(array $task, array $parseResult): bool
{
    $runTimeTableTasks = $this->getRunTimeTable()->table;

    $min = date('YmdHi');
    $sec = strtotime(date('Y-m-d H:i'));
    foreach ($parseResult as $time) {
        $this->checkTaskQueue(false);
        $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
        $runTimeTableTasks->set($key, [
            'taskClass'  => $task['taskClass'],
            'taskMethod' => $task['taskMethod'],
            'minute'     => $min,
            'sec'        => $time + $sec,
            'runStatus'  => self::NORMAL
        ]);
    }

    return true;
}

CronTimerProcessSwoft的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
该进程使用了Swoole的定时器功能,通过Swoole\Timer在每分钟首秒时执行的回调,CronTimerProcess每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。

//Swoft\Task\Bootstrap\Process
/**
 * Crontab process
 *
 * @Process(name="cronExec", boot=true)
 */
class CronExecProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        $pname = App::$server->getPname();
        $process->name(sprintf('%s cronexec process', $pname));

        /** @var \Swoft\Task\Crontab\Crontab $cron */
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $server->tick(0.5 * 1000, function () use ($cron) {
            $tasks = $cron->getExecTasks();
            if (!empty($tasks)) {
                foreach ($tasks as $task) {
                    // Diliver task
                    Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
                    $cron->finishTask($task['key']);
                }
            }
        });
    }
}

CronExecProcess作为定时任务的执行者,通过Swoole\Timer0.5s唤醒自身一次,然后把 执行表 遍历一次,挑选当下需要执行的任务,通过sendMessage()投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程中由TaskExecutor处理。

定时任务的宏观执行情况如下:SwooleとSwoftのSwoftソースコードの解析

相关推荐:

关于PHP中token的生成的解析

TP5 での URL アクセス パターンの解析

以上がSwooleとSwoftのSwoftソースコードの解析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。