이 기사의 내용은 Swoft 소스 코드 분석(작업 전달/예약 작업)에 대해 Swoole 및 Swoft에 대한 소개를 공유하는 것입니다. 필요한 친구가 참조할 수 있습니다.
Swoft
의 작업 기능은 Swoole
의 작업 메커니즘
을 기반으로 합니다. >, 즉 Swoft
의 Task
메커니즘은 본질적으로 Swoole
의 Task 메커니즘
을 캡슐화하고 향상시킨 것입니다. >. Swoft
的任务功能基于Swoole
的Task机制
,或者说Swoft
的Task
机制本质就是对Swoole
的Task机制
的封装和加强。
//Swoft\Task\Task.php class Task { /** * Deliver coroutine or async task * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * * @return bool|array * @throws TaskException */ public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3) { $data = TaskHelper::pack($taskName, $methodName, $params, $type); if(!App::isWorkerStatus() && !App::isCoContext()){ return self::deliverByQueue($data);//见下文Command章节 } if(!App::isWorkerStatus() && App::isCoContext()){ throw new TaskException('Please deliver task by http!'); } $server = App::$server->getServer(); // Delier coroutine task if ($type == self::TYPE_CO) { $tasks[0] = $data; $prifleKey = 'task' . '.' . $taskName . '.' . $methodName; App::profileStart($prifleKey); $result = $server->taskCo($tasks, $timeout); App::profileEnd($prifleKey); return $result; } // Deliver async task return $server->task($data); } }
任务投递Task::deliver()
将调用参数打包后根据$type
参数通过Swoole
的$server->taskCo()
或$server->task()
接口投递到Task进程
。Task
本身始终是同步执行的,$type
仅仅影响投递这一操作的行为,Task::TYPE_ASYNC
对应的$server->task()
是异步投递,Task::deliver()
调用后马上返回;Task::TYPE_CO
对应的$server->taskCo()
是协程投递,投递后让出协程控制,任务完成或执行超时后Task::deliver()
才从协程返回。
//Swoft\Task\Bootstrap\Listeners\TaskEventListener /** * The listener of swoole task * @SwooleListener({ * SwooleEvent::ON_TASK, * SwooleEvent::ON_FINISH, * }) */ class TaskEventListener implements TaskInterface, FinishInterface { /** * @param \Swoole\Server $server * @param int $taskId * @param int $workerId * @param mixed $data * @return mixed * @throws \InvalidArgumentException */ public function onTask(Server $server, int $taskId, int $workerId, $data) { try { /* @var TaskExecutor $taskExecutor*/ $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (\Throwable $throwable) { App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine())); $result = false; // Release system resources App::trigger(AppEvent::RESOURCE_RELEASE); App::trigger(TaskEvent::AFTER_TASK); } return $result; } }
此处是swoole.onTask
的事件回调,其职责仅仅是将将Worker
进程投递来的打包后的数据转发给TaskExecutor
。
Swoole
的Task
机制的本质是Worker进程
将耗时任务投递给同步的Task进程
(又名TaskWorker
)处理,所以swoole.onTask
的事件回调是在Task进程
中执行的。上文说过,Worker进程
是你大部分HTTP
服务代码执行的环境,但是从TaskEventListener.onTask()
方法开始,代码的执行环境都是Task进程
,也就是说,TaskExecutor
和具体的TaskBean
都是执行在Task进程
中的。
//Swoft\Task\TaskExecutor /** * The task executor * * @Bean() */ class TaskExecutor { /** * @param string $data * @return mixed */ public function run(string $data) { $data = TaskHelper::unpack($data); $name = $data['name']; $type = $data['type']; $method = $data['method']; $params = $data['params']; $logid = $data['logid'] ?? uniqid('', true); $spanid = $data['spanid'] ?? 0; $collector = TaskCollector::getCollector(); if (!isset($collector['task'][$name])) { return false; } list(, $coroutine) = $collector['task'][$name]; $task = App::getBean($name); if ($coroutine) { $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type); } else { $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type); } return $result; } }
任务执行思路很简单,将Worker进程
发过来的数据解包还原成原来的调用参数,根据$name
参数找到对应的TaskBean
并调用其对应的task()
方法。其中TaskBean
使用类级别注解@Task(name="TaskName")
或者@Task("TaskName")
声明。
值得一提的一点是,@Task
注解除了name
属性,还有一个coroutine
属性,上述代码会根据该参数选择使用协程的runCoTask()
或者同步的runSyncTask()
执行Task
。但是由于而且由于Swoole
的Task进程
的执行是完全同步的,不支持协程,所以目前版本请该参数不要配置为true
。同样的在TaskBean
中编写的任务代码必须的同步阻塞的或者是要能根据环境自动将异步非阻塞和协程降级为同步阻塞的
前面我们提到:
Swoole
的Task
机制的本质是Worker进程
将耗时任务投递给同步的Task进程
(又名TaskWorker
)处理。
换句话说,Swoole
的$server->taskCo()
或$server->task()
都只能在Worker进程
中使用。
这个限制大大的限制了使用场景。 如何能够为了能够在Process
中投递任务呢?Swoft
为了绕过这个限制提供了Task::deliverByProcess()
方法。其实现原理也很简单,通过Swoole
的$server->sendMessage()
方法将调用信息从Process
中投递到Worker进程
中,然后由Worker进程替其投递到Task进程
当中,相关代码如下:
//Swoft\Task\Task.php /** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * @param int $workId * * @return bool */ public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool { /* @var PipeMessageInterface $pipeMessage */ $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ 'name' => $taskName, 'method' => $methodName, 'params' => $params, 'timeout' => $timeout, 'type' => $type, ]; $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data); return $server->sendMessage($message, $workId); }
数据打包后使用$server->sendMessage()
投递给Worker
:
//Swoft\Bootstrap\Server\ServerTrait.php /** * onPipeMessage event callback * * @param \Swoole\Server $server * @param int $srcWorkerId * @param string $message * @return void * @throws \InvalidArgumentException */ public function onPipeMessage(Server $server, int $srcWorkerId, string $message) { /* @var PipeMessageInterface $pipeMessage */ $pipeMessage = App::getBean(PipeMessage::class); list($type, $data) = $pipeMessage->unpack($message); App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId); }
$server->sendMessage
后,Worker进程
收到数据时会触发一个swoole.pipeMessage
事件的回调,Swoft
会将其转换成自己的swoft.pipeMessage
事件并触发.
//Swoft\Task\Event\Listeners\PipeMessageListener.php /** * The pipe message listener * * @Listener(event=AppEvent::PIPE_MESSAGE) */ class PipeMessageListener implements EventHandlerInterface { /** * @param \Swoft\Event\EventInterface $event */ public function handle(EventInterface $event) { $params = $event->getParams(); if (count($params) <p><code>swoft.pipeMessage</code>事件最终由<code>PipeMessageListener</code>处理。在相关的监听其中,如果发现<code>swoft.pipeMessage</code>事件由<code>Task::deliverByProcess()</code>产生的,<code>Worker进程</code>会替其执行一次<code>Task::deliver()</code>,最终将任务数据投递到<code>TaskWorker进程</code>中。</p><p>一道简单的回顾练习:从<code>Task::deliverByProcess()</code>到某<code>TaskBean</code></p>Task 전달#🎜🎜#<pre class="brush:php;toolbar:false">//Swoft\Task\QueueTask.php /** * @param string $data * @param int $taskWorkerId * @param int $srcWorkerId * * @return bool */ public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null) { if ($taskWorkerId === null) { $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum); } if ($srcWorkerId === null) { $srcWorkerId = mt_rand(0, $this->workerNum - 1); } $this->check(); $data = $this->pack($data, $srcWorkerId); $result = \msg_send($this->queueId, $taskWorkerId, $data, false); if (!$result) { return false; } return true; }#🎜🎜#Task 전달
Task::deliver()
는 $type
에 따라 호출 매개변수를 패키징합니다. Swoole$server->taskCo()
또는 $server->task()
인터페이스를 통해 작업 프로세스 /코드>. <br><code>Task
자체는 항상 동기식으로 실행됩니다. $type
은 Task::TYPE_ASYNC
에 해당하는 동작에만 영향을 미칩니다. code>$server->task()는 비동기 전달이며, Task::deliver()
는 호출된 후 즉시 반환됩니다. Task::TYPE_CO
는 $server->taskCo()는 코루틴 전달입니다. 전달 후에는 코루틴 제어가 포기됩니다. Task::deliver()
는 코루틴으로 반환되지 않습니다. 작업이 완료되거나 Cheng이 반환됩니다. #🎜🎜##🎜🎜#Task Execution#🎜🎜#\Swoft\Task\Crontab\TableCrontab.php /** * 任务表,记录用户配置的任务信息 * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录 * @var array $originStruct */ private $originStruct = [ 'rule' => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性 'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名) 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法 'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳 ]; /** * 执行表,记录短时间内要执行的任务列表及其执行状态 * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录 * @var array $runTimeStruct */ private $runTimeStruct = [ 'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上 'minute' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi') 'sec' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳 'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。 //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。 ];#🎜🎜#다음은
swoole.onTask
의 이벤트 콜백입니다. 해당 책임은 Worker
전송뿐입니다. 프로세스에 의해 전달된 패키지 데이터는 TaskExecutor
로 전달됩니다. #🎜🎜##🎜🎜#Swoole
의 작업
메커니즘의 핵심은 작업자 프로세스
가 시간이 많이 걸리는 작업을 동기화된 프로세스에 전달한다는 것입니다. 작업 프로세스
(TaskWorker
라고도 함)가 처리되므로 swoole.onTask
의 이벤트 콜백이 작업 프로세스. 위에서 언급한 것처럼 <code>Worker 프로세스
는 대부분의 HTTP
서비스 코드가 실행되는 환경이지만 TaskEventListener.onTask()
메서드에서 시작됩니다. , 코드 실행 환경은 모두 Task 프로세스
입니다. 즉, TaskExecutor
와 특정 TaskBean
이 Task에서 실행됩니다. 프로세스
> in. #🎜🎜#//Swoft\Task\Bootstrap\Process\CronTimerProcess.php /** * Crontab timer process * * @Process(name="cronTimer", boot=true) */ class CronTimerProcess implements ProcessInterface { /** * @param \Swoft\Process\Process $process */ public function run(SwoftProcess $process) { //code.... /* @var \Swoft\Task\Crontab\Crontab $cron*/ $cron = App::getBean('crontab'); // Swoole/HttpServer $server = App::$server->getServer(); $time = (60 - date('s')) * 1000; $server->after($time, function () use ($server, $cron) { // Every minute check all tasks, and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000, function () use ($cron) { $cron->checkTask(); }); }); } }#🎜🎜#작업 실행 아이디어는 매우 간단합니다.
작업자 프로세스
에서 보낸 데이터를 압축 해제하고 이를 원래 호출 매개변수로 복원합니다. $name
매개변수는 해당 TaskBean
이며 해당 task()
메서드를 호출합니다. 그 중 TaskBean
은 클래스 수준 주석 @Task(name="TaskName")
또는 @Task("TaskName")
을 사용하여 선언됩니다. #🎜🎜##🎜🎜#@Task
주석은 name
속성을 제거하고 coroutine
속성도 포함한다는 점을 언급할 가치가 있습니다. 위에서 언급한 코드는 코루틴의 runCoTask()
또는 동기식 runSyncTask()
를 사용하여 이 매개변수를 기반으로 Task
를 실행하도록 선택합니다. 그러나 Swoole
의 작업 프로세스
실행은 완전히 동기식이며 코루틴을 지원하지 않으므로 다음에서 이 매개변수를 true
로 구성하지 마세요. 현재 버전. >. 마찬가지로 TaskBean
에 작성된 작업 코드는 동기식 차단이거나 #🎜🎜##🎜🎜#즉,Swoole
의Task
메커니즘은 기본적으로작업자 프로세스
코드입니다. >시간이 많이 걸리는 작업을 동기화된작업 프로세스
(TaskWorker
라고도 함)에 전달하여 처리하세요.
Swoole
의 $server->taskCo()
또는 $server->task() code>는 작업자 프로세스
에서만 사용할 수 있습니다.
이 제한은 사용 시나리오를 크게 제한합니다. 프로세스
에서 작업을 어떻게 전달하나요? Swoft
는 이러한 제한을 우회하기 위해 Task::deliverByProcess()
메서드를 제공합니다. 구현 원리도 매우 간단합니다. 호출 정보는 Swoole$server->sendMessage()
메소드를 통해 Process
에서 로 전달됩니다. /code>.>Worker 프로세스
, 그러면 Worker 프로세스가 이를 태스크 프로세스
에 전달합니다. 관련 코드는 다음과 같습니다. #🎜🎜#//Swoft\Task\Crontab\Crontab.php
/**
* 初始化runTimeTable数据
*
* @param array $task 任务
* @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
* @return bool
*/
private function initRunTimeTableData(array $task, array $parseResult): bool
{
$runTimeTableTasks = $this->getRunTimeTable()->table;
$min = date('YmdHi');
$sec = strtotime(date('Y-m-d H:i'));
foreach ($parseResult as $time) {
$this->checkTaskQueue(false);
$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
$runTimeTableTasks->set($key, [
'taskClass' => $task['taskClass'],
'taskMethod' => $task['taskMethod'],
'minute' => $min,
'sec' => $time + $sec,
'runStatus' => self::NORMAL
]);
}
return true;
}
#🎜🎜#데이터 뒤에 패키지되어 있으면 $server ->sendMessage()
를 사용하여 Worker
:#🎜🎜#//Swoft\Task\Bootstrap\Process
/**
* Crontab process
*
* @Process(name="cronExec", boot=true)
*/
class CronExecProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
$pname = App::$server->getPname();
$process->name(sprintf('%s cronexec process', $pname));
/** @var \Swoft\Task\Crontab\Crontab $cron */
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$server->tick(0.5 * 1000, function () use ($cron) {
$tasks = $cron->getExecTasks();
if (!empty($tasks)) {
foreach ($tasks as $task) {
// Diliver task
Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
$cron->finishTask($task['key']);
}
}
});
}
}
#🎜🎜#$server->sendMessage, Worker 프로세스
데이터를 수신하면 swoole.pipeMessage
이벤트의 콜백이 트리거되고 Swoft
가 이를 다음으로 변환합니다. 자체 swoft.pipeMessage
이벤트 및 Trigger.#🎜🎜#rrreee#🎜🎜#swoft.pipeMessage
이벤트는 궁극적으로 PipeMessageListener
에 의해 처리됩니다. 관련 모니터링에서 Task::deliverByProcess()
에 의해 swoft.pipeMessage
이벤트가 생성된 것으로 확인되면 Worker 프로세스
는 Task::deliver()
를 한 번 실행하고 마지막으로 TaskWorker 프로세스
에 작업 데이터를 전달합니다. #🎜🎜##🎜🎜#간단한 검토 연습: Task::deliverByProcess()
부터 TaskBean
에 의한 작업의 최종 실행까지 어떤 프로세스를 경험했는지 , 그리고 콜 체인의 어떤 부분이 어떤 프로세스에서 실행됩니까? #🎜🎜#从Command进程或其子进程中投递任务
//Swoft\Task\QueueTask.php
/**
* @param string $data
* @param int $taskWorkerId
* @param int $srcWorkerId
*
* @return bool
*/
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
if ($taskWorkerId === null) {
$taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
}
if ($srcWorkerId === null) {
$srcWorkerId = mt_rand(0, $this->workerNum - 1);
}
$this->check();
$data = $this->pack($data, $srcWorkerId);
$result = \msg_send($this->queueId, $taskWorkerId, $data, false);
if (!$result) {
return false;
}
return true;
}
对于Command
进程的任务投递,情况会更复杂一点。
上文提到的Process
,其往往衍生于Http/Rpc
服务,作为同一个Manager
的子孙进程,他们能够拿到Swoole\Server
的句柄变量,从而通过$server->sendMessage()
,$server->task()
等方法进行任务投递。
但在Swoft
的体系中,还有一个十分路人的角色: Command
。
Command
的进程从shell
或cronb
独立启动,和Http/Rpc
服务相关的进程没有亲缘关系。因此Command
进程以及从Command
中启动的Process
进程是没有办法拿到Swoole\Server
的调用句柄直接通过UnixSocket
进行任务投递的。
为了为这种进程提供任务投递支持,Swoft
利用了Swoole
的Task进程
的一个特殊功能----消息队列。
同一个项目中Command
和Http\RpcServer
通过约定一个message_queue_key
获取到系统内核中的同一条消息队列,然后Comand
进程就可以通过该消息队列向Task进程
投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()
方法中,Swoft
会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore
拓展,如果你想使用,需要在编译PHP
时加上--enable-sysvmsg
参数。
定时任务
除了手动执行的普通任务,Swoft
还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab
功能.
Swoft
用两个前置Process
---任务计划进程:CronTimerProcess
和任务执行进程CronExecProcess
,和两张内存数据表-----RunTimeTable
(任务(配置)表)OriginTable
((任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:
\\Swoft\Task\Crontab\TableCrontab.php
/**
* 任务表,记录用户配置的任务信息
* 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
* @var array $originStruct
*/
private $originStruct = [
'rule' => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
];
/**
* 执行表,记录短时间内要执行的任务列表及其执行状态
* 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
* @var array $runTimeStruct
*/
private $runTimeStruct = [
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
'minute' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
'sec' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。
//注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
];
此处为何要使用Swoole的内存Table?
Swoft
的的定时任务管理是分别由 任务计划进程 和 任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()
等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table
(本文的Table
,除非特别说明,都指Swoole
的Swoole\Table
结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。
为了Table
能够在两个进程间共同使用,Table
必须在Swoole Server
启动前创建并分配内存。具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()
中,比较简单,有兴趣的可以自行阅读。
背景介绍完了,我们来看看这两个定时任务进程的行为
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
* Crontab timer process
*
* @Process(name="cronTimer", boot=true)
*/
class CronTimerProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
//code....
/* @var \Swoft\Task\Crontab\Crontab $cron*/
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$time = (60 - date('s')) * 1000;
$server->after($time, function () use ($server, $cron) {
// Every minute check all tasks, and prepare the tasks that next execution point needs
$cron->checkTask();
$server->tick(60 * 1000, function () use ($cron) {
$cron->checkTask();
});
});
}
}
//Swoft\Task\Crontab\Crontab.php
/**
* 初始化runTimeTable数据
*
* @param array $task 任务
* @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
* @return bool
*/
private function initRunTimeTableData(array $task, array $parseResult): bool
{
$runTimeTableTasks = $this->getRunTimeTable()->table;
$min = date('YmdHi');
$sec = strtotime(date('Y-m-d H:i'));
foreach ($parseResult as $time) {
$this->checkTaskQueue(false);
$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
$runTimeTableTasks->set($key, [
'taskClass' => $task['taskClass'],
'taskMethod' => $task['taskMethod'],
'minute' => $min,
'sec' => $time + $sec,
'runStatus' => self::NORMAL
]);
}
return true;
}
CronTimerProcess
是Swoft
的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
。
该进程使用了Swoole
的定时器功能,通过Swoole\Timer
在每分钟首秒时执行的回调,CronTimerProcess
每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。
//Swoft\Task\Bootstrap\Process
/**
* Crontab process
*
* @Process(name="cronExec", boot=true)
*/
class CronExecProcess implements ProcessInterface
{
/**
* @param \Swoft\Process\Process $process
*/
public function run(SwoftProcess $process)
{
$pname = App::$server->getPname();
$process->name(sprintf('%s cronexec process', $pname));
/** @var \Swoft\Task\Crontab\Crontab $cron */
$cron = App::getBean('crontab');
// Swoole/HttpServer
$server = App::$server->getServer();
$server->tick(0.5 * 1000, function () use ($cron) {
$tasks = $cron->getExecTasks();
if (!empty($tasks)) {
foreach ($tasks as $task) {
// Diliver task
Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
$cron->finishTask($task['key']);
}
}
});
}
}
CronExecProcess
作为定时任务的执行者,通过Swoole\Timer
每0.5s
唤醒自身一次,然后把 执行表
遍历一次,挑选当下需要执行的任务,通过sendMessage()
投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程
中由TaskExecutor
处理。
定时任务的宏观执行情况如下:
相关推荐:
위 내용은 Swooft의 Swoole 및 Swoft 소스 코드 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!