首頁 >後端開發 >php教程 >Swoft源碼之Swoole和Swoft的分析

Swoft源碼之Swoole和Swoft的分析

不言
不言原創
2018-07-23 10:28:036350瀏覽

這篇文章給大家分享的內容是關於Swoft 原始碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。

前言

Swoft的任務功能是基於SwooleTask機制,或說SwoftTask機製本質就是對SwooleTask機制的封裝與加強。

任務投遞

//Swoft\Task\Task.php
class Task
{
    /**
     * Deliver coroutine or async task
     *
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     *
     * @return bool|array
     * @throws TaskException
     */
    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
    {
        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

        if(!App::isWorkerStatus() && !App::isCoContext()){
            return self::deliverByQueue($data);//见下文Command章节
        }

        if(!App::isWorkerStatus() && App::isCoContext()){
            throw new TaskException('Please deliver task by http!');
        }


        $server = App::$server->getServer();
        // Delier coroutine task
        if ($type == self::TYPE_CO) {
            $tasks[0]  = $data;
            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

            App::profileStart($prifleKey);
            $result = $server->taskCo($tasks, $timeout);
            App::profileEnd($prifleKey);

            return $result;
        }

        // Deliver async task
        return $server->task($data);
    }
}

任務投遞Task::deliver()將呼叫參數打包後根據$type參數透過Swoole$server->taskCo()$server->task()介面投遞到Task進程
Task本身總是同步執行的,$type只影響投遞這一操作的行為,Task::TYPE_ASYNC#對應的$ server->task()是異步投遞,Task::deliver()呼叫後馬上回傳;Task::TYPE_CO對應的$server-&gt ;taskCo()是協程投遞,投遞後讓出協程控制,任務完成或執行逾時後Task::deliver()才從協程返回。

任務執行

//Swoft\Task\Bootstrap\Listeners\TaskEventListener 
/**
 * The listener of swoole task
 * @SwooleListener({
 *     SwooleEvent::ON_TASK,
 *     SwooleEvent::ON_FINISH,
 * })
 */
class TaskEventListener implements TaskInterface, FinishInterface
{
    /**
     * @param \Swoole\Server $server
     * @param int            $taskId
     * @param int            $workerId
     * @param mixed          $data
     * @return mixed
     * @throws \InvalidArgumentException
     */
    public function onTask(Server $server, int $taskId, int $workerId, $data)
    {
        try {
            /* @var TaskExecutor $taskExecutor*/
            $taskExecutor = App::getBean(TaskExecutor::class);
            $result = $taskExecutor->run($data);
        } catch (\Throwable $throwable) {
            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
            $result = false;

            // Release system resources
            App::trigger(AppEvent::RESOURCE_RELEASE);

            App::trigger(TaskEvent::AFTER_TASK);
        }
        return $result;
    }
}

此處是swoole.onTask的事件回調,其職責僅是將Worker進程投遞來的打包後的資料轉送給TaskExecutor

SwooleTask機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名TaskWorker)處理,所以swoole.onTask的事件回呼是在Task進程中執行的。上文說過,Worker進程是你大部分HTTP服務程式碼執行的環境,但從TaskEventListener.onTask()方法開始,程式碼的執行環境都是Task程序,也就是說,TaskExecutor和具體的TaskBean都是執行在Task進程中的。

//Swoft\Task\TaskExecutor
/**
 * The task executor
 *
 * @Bean()
 */
class TaskExecutor
{
    /**
     * @param string $data
     * @return mixed
    */
    public function run(string $data)
    {
        $data = TaskHelper::unpack($data);

        $name   = $data['name'];
        $type   = $data['type'];
        $method = $data['method'];
        $params = $data['params'];
        $logid  = $data['logid'] ?? uniqid('', true);
        $spanid = $data['spanid'] ?? 0;


        $collector = TaskCollector::getCollector();
        if (!isset($collector['task'][$name])) {
            return false;
        }

        list(, $coroutine) = $collector['task'][$name];
        $task = App::getBean($name);
        if ($coroutine) {
            $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
        } else {
            $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
        }

        return $result;
    }
}

任務執行思路很簡單,將Worker進程發過來的資料解包還原成原來的呼叫參數,根據$name參數找到對應的TaskBean並呼叫其對應的task()方法。其中TaskBean使用類別層級註解@Task(name="TaskName")@Task("TaskName")宣告。

值得一提的一點是,@Task註解除了name屬性,還有一個coroutine屬性,上述程式碼會根據該參數選擇使用協程的runCoTask()或同步的runSyncTask()執行Task。但由於而且由於SwooleTask進程的執行是完全同步的,不支援協程,所以目前版本請該參數不要配置為true。同樣的在TaskBean中編寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將異步非阻塞和協程降級為同步阻塞的

從Process中投遞任務

前面我們提到:

SwooleTask機制的本質是Worker進程將耗時任務投遞給同步的Task程序(又名TaskWorker)處理。

換句話說,Swoole$server->taskCo()$server->task()都只能在Worker進程中使用。
這個限制大大的限制了使用場景。如何能夠為了能夠在Process中投遞任務呢? Swoft為了繞過這個限制提供了Task::deliverByProcess()方法。其實作原理也很簡單,透過Swoole$server->sendMessage()方法將呼叫資訊從Process##Worker進程中,然後由Worker進程替其投遞到Task進程當中,相關程式碼如下:

//Swoft\Task\Task.php
/**
 * Deliver task by process
 *
 * @param string $taskName
 * @param string $methodName
 * @param array  $params
 * @param string $type
 * @param int    $timeout
 * @param int    $workId
 *
 * @return bool
 */
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
    /* @var PipeMessageInterface $pipeMessage */
    $server      = App::$server->getServer();
    $pipeMessage = App::getBean(PipeMessage::class);
    $data = [
        'name'    => $taskName,
        'method'  => $methodName,
        'params'  => $params,
        'timeout' => $timeout,
        'type'    => $type,
    ];

    $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
    return $server->sendMessage($message, $workId);
}

資料打包後使用$server->sendMessage()投遞給Worker:

//Swoft\Bootstrap\Server\ServerTrait.php
/**
 * onPipeMessage event callback
 *
 * @param \Swoole\Server $server
 * @param int            $srcWorkerId
 * @param string         $message
 * @return void
 * @throws \InvalidArgumentException
 */
public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
{
    /* @var PipeMessageInterface $pipeMessage */
    $pipeMessage = App::getBean(PipeMessage::class);
    list($type, $data) = $pipeMessage->unpack($message);

    App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
}

$server->sendMessage後,Worker進程收到資料時會觸發一個swoole.pipeMessage事件的回調,Swoft會將其轉換成自己的swoft.pipeMessage事件並觸發.

//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
 * The pipe message listener
 *
 * @Listener(event=AppEvent::PIPE_MESSAGE)
 */
class PipeMessageListener implements EventHandlerInterface
{
    /**
     * @param \Swoft\Event\EventInterface $event
     */
    public function handle(EventInterface $event)
    {
        $params = $event->getParams();
        if (count($params) <p><code>swoft. pipeMessage</code>事件最終由<code>PipeMessageListener</code>處理。在相關的監聽其中,如果發現<code>swoft.pipeMessage</code>事件由<code>Task::deliverByProcess()</code>產生的,<code>Worker進程</code>會替其執行一次<code> Task::deliver()</code>,最後將任務資料投遞到<code>TaskWorker進程</code>。 </p><p>一道簡單的回顧練習:從<code>Task::deliverByProcess()</code>到某<code>TaskBean</code> 最終執行任務,經歷了哪些進程,而呼叫鏈的哪些部分又分別是在哪些進程中執行? </p><h3>从Command进程或其子进程中投递任务</h3><pre class="brush:php;toolbar:false">//Swoft\Task\QueueTask.php
/**
 * @param string $data
 * @param int    $taskWorkerId
 * @param int    $srcWorkerId
 *
 * @return bool
 */
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
    if ($taskWorkerId === null) {
        $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
    }

    if ($srcWorkerId === null) {
        $srcWorkerId = mt_rand(0, $this->workerNum - 1);
    }

    $this->check();
    $data   = $this->pack($data, $srcWorkerId);
    $result = \msg_send($this->queueId, $taskWorkerId, $data, false);
    if (!$result) {
        return false;
    }

    return true;
}

对于Command进程的任务投递,情况会更复杂一点。
上文提到的Process,其往往衍生于Http/Rpc服务,作为同一个Manager的子孙进程,他们能够拿到Swoole\Server的句柄变量,从而通过$server->sendMessage(),$server->task()等方法进行任务投递。

但在Swoft的体系中,还有一个十分路人的角色: Command
Command的进程从shellcronb独立启动,和Http/Rpc服务相关的进程没有亲缘关系。因此Command进程以及从Command中启动的Process进程是没有办法拿到Swoole\Server的调用句柄直接通过UnixSocket进行任务投递的。
为了为这种进程提供任务投递支持,Swoft利用了SwooleTask进程的一个特殊功能----消息队列

Swoft源碼之Swoole和Swoft的分析

同一个项目中CommandHttp\RpcServer 通过约定一个message_queue_key获取到系统内核中的同一条消息队列,然后Comand进程就可以通过该消息队列向Task进程投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()方法中,Swoft会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore拓展,如果你想使用,需要在编译PHP时加上--enable-sysvmsg参数。

定时任务

除了手动执行的普通任务,Swoft还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab功能.

Swoft用两个前置Process---任务计划进程:CronTimerProcess和任务执行进程CronExecProcess
,和两张内存数据表-----RunTimeTable(任务(配置)表)OriginTable((任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:

\\Swoft\Task\Crontab\TableCrontab.php
/**
 * 任务表,记录用户配置的任务信息
 * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
 * @var array $originStruct 
 */
private $originStruct = [
    'rule'       => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
    'add_time'   => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
];

/**
 * 执行表,记录短时间内要执行的任务列表及其执行状态
 * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
 * @var array $runTimeStruct 
 */
private $runTimeStruct = [
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//同上
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
    'minute'      => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
    'sec'        => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
    'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行)  1(已执行)  2(执行中) 三种。 
    //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
];

此处为何要使用Swoole的内存Table?

Swoft的的定时任务管理是分别由 任务计划进程任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table(本文的Table,除非特别说明,都指SwooleSwoole\Table结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。

为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存。具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比较简单,有兴趣的可以自行阅读。

背景介绍完了,我们来看看这两个定时任务进程的行为

//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
 * Crontab timer process
 *
 * @Process(name="cronTimer", boot=true)
 */
class CronTimerProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        //code....
        /* @var \Swoft\Task\Crontab\Crontab $cron*/
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $time = (60 - date('s')) * 1000;
        $server->after($time, function () use ($server, $cron) {
            // Every minute check all tasks, and prepare the tasks that next execution point needs
            $cron->checkTask();
            $server->tick(60 * 1000, function () use ($cron) {
                $cron->checkTask();
            });
        });
    }
}
//Swoft\Task\Crontab\Crontab.php
/**
 * 初始化runTimeTable数据
 *
 * @param array $task        任务
 * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
 * @return bool
 */
private function initRunTimeTableData(array $task, array $parseResult): bool
{
    $runTimeTableTasks = $this->getRunTimeTable()->table;

    $min = date('YmdHi');
    $sec = strtotime(date('Y-m-d H:i'));
    foreach ($parseResult as $time) {
        $this->checkTaskQueue(false);
        $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
        $runTimeTableTasks->set($key, [
            'taskClass'  => $task['taskClass'],
            'taskMethod' => $task['taskMethod'],
            'minute'     => $min,
            'sec'        => $time + $sec,
            'runStatus'  => self::NORMAL
        ]);
    }

    return true;
}

CronTimerProcessSwoft的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
该进程使用了Swoole的定时器功能,通过Swoole\Timer在每分钟首秒时执行的回调,CronTimerProcess每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。

//Swoft\Task\Bootstrap\Process
/**
 * Crontab process
 *
 * @Process(name="cronExec", boot=true)
 */
class CronExecProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        $pname = App::$server->getPname();
        $process->name(sprintf('%s cronexec process', $pname));

        /** @var \Swoft\Task\Crontab\Crontab $cron */
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $server->tick(0.5 * 1000, function () use ($cron) {
            $tasks = $cron->getExecTasks();
            if (!empty($tasks)) {
                foreach ($tasks as $task) {
                    // Diliver task
                    Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
                    $cron->finishTask($task['key']);
                }
            }
        });
    }
}

CronExecProcess作为定时任务的执行者,通过Swoole\Timer0.5s唤醒自身一次,然后把 执行表 遍历一次,挑选当下需要执行的任务,通过sendMessage()投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程中由TaskExecutor处理。

定时任务的宏观执行情况如下:Swoft源碼之Swoole和Swoft的分析

相关推荐:

关于PHP中token的生成的解析

TP5中URL存取模式的解析

#

以上是Swoft源碼之Swoole和Swoft的分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn