Home  >  Article  >  Backend Development  >  Introduction to PHPyii2 queue shmilyzxt/yii2-queue

Introduction to PHPyii2 queue shmilyzxt/yii2-queue

炎欲天舞
炎欲天舞Original
2017-08-04 10:42:311830browse
  • I feel very good during the use. I suggest you take a look at the principles of queues.

  • ##shmilyzxt/yii2-queue Simple explanation:

  1. I use the yii2 advanced version. Let’s start with the configuration and look at the code. Here I use mysql queue. First configure the file. I put

    queueThe configuration items are written under the components array in the root directory common\config\main-local.php, change the database configuration. Copy composerCopy after installation
    vendor\shmilyzxt\yii2-queue\jobs\jobs.sql##vendor\shmilyzxt\yii2-queue\failed\failed.sql
    2 sql files to the database to create a queue data table and a data table when the task fails.

  2. Push task start syntax:
  3. \Yii::$app-> ;queue->pushOn(new SendMial(),['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],' email');

    Let’s go to vendor\shmilyzxt\queue\queues\DatabaseQueue.php to take a look at the code. The pushOn() method is written in DatabaseQueueParent class of classvendor\shmilyzxt\queue\base\Queue.php中:

##
//入队列public function pushOn($job, $data = '', $queue = null)
    {        //canPush 检查队列是否已达最大任务量
        if ($this->canPush()) {  
            //beforePush 入队列前的事件
            $this->trigger(self::EVENT_BEFORE_PUSH); 
            //入队列
            $ret = $this->push($job, $data, $queue);
            //afterPush 入队列后的事件
            $this->trigger(self::EVENT_AFTER_PUSH);
            return $ret;
        } else {            throw new \Exception("max jobs number exceed! the max jobs number is {$this->maxJob}");
        }
    }

Comment: Here is the most Let’s take a look at the yii2 event event class.

About queuing:

$this->push($job, $data, $queue);

, here is the cooperation with

queue View class files, jump to related functions, process the data and record it in the database. (Function direction: getQueue()-->createPayload()-->pushToDatabase()),pushOn()Finally returns the result of data insertion into the database. Successful $ret is 1.3. Run the command processing queue in the background, for example: php ./ yii worker/listen default 10 128 3 0

where

default is the name of the queue. The email queue pushed above should be changed to email.After starting the command, let's look at the code: first execute: WorkerControllerController
actionListen method, we follow the code and enter vendor\shmilyzxt\queue\Worker.php -- In the listen method, it is actually looping all the time to execute the tasks of the operation queue: ##

/**  
 * 启用一个队列后台监听任务  
 * @param Queue $queue  
 * @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务)  
 * @param int $attempt 队列任务失败尝试次数,0为不限制  * @param int $memory 允许使用的最大内存  
 * @param int $sleep 每次检测的时间间隔  
 */
    public static function listen(Queue $queue, $queueName = 'default', $attempt = 10, $memory = 512, $sleep = 3, $delay = 0){        
        while (true){            
            try{               
                //DatabaseQueue从数据库队列取出一个可用任务(实例),并且更新任务
                $job = $queue->pop($queueName);
             }catch (\Exception $e){                
                throw $e;
                continue;
             }            
             if($job instanceof Job){                
                 //判断执行错误的次数是否大于传入的执行次数
                 if($attempt > 0 && $job->getAttempts() > $attempt){                    
                     $job->failed();
                 }else{                    
                     try{                        
                         //throw new \Exception("test failed");
                         $job->execute();
                      }catch (\Exception $e){                        
                          //执行失败,判断是否被删除,重新入队
                          if (! $job->isDeleted()) {                           
                              $job->release($delay);
                          }
                      }
                  }
               }else{                
                   self::sleep($sleep);
                }             
                if (self::memoryExceeded($memory)) {               
                    self::stop();
                }
               }
    }

Note: In

$queue->pop($ queueName);
is

vendor\shmilyzxt\queue\queues\DatabaseQueue.php

uses transactions to execute SQL within the method, and creates vendor\shmilyzxt\queue\jobs\DatabaseJob.php The instance of ##<pre class="brush:php;toolbar:false;"> //取出一个任务 public function pop($queue = null) { $queue = $this-&gt;getQueue($queue); if (!is_null($this-&gt;expire)) { //$this-&gt;releaseJobsThatHaveBeenReservedTooLong($queue); } $tran = $this-&gt;connector-&gt;beginTransaction(); //判断是否有一个可用的任务需要执行 if ($job = $this-&gt;getNextAvailableJob($queue)) { $this-&gt;markJobAsReserved($job-&gt;id); $tran-&gt;commit(); $config = array_merge($this-&gt;jobEvent, [ &amp;#39;class&amp;#39; =&gt; &amp;#39;shmilyzxt\queue\jobs\DatabaseJob&amp;#39;, &amp;#39;queue&amp;#39; =&gt; $queue, &amp;#39;job&amp;#39; =&gt; $job, &amp;#39;queueInstance&amp;#39; =&gt; $this, ]); return \Yii::createObject($config); } $tran-&gt;commit(); return false; }</pre> as for:

$job->execute();

is
DatabaseJob

inherits the parent class

Job Executed, follow the code to find the event executed by yii\base\Component trigger,<pre class="brush:php;toolbar:false;">/** * 执行任务 */ public function execute(){ //beforeExecute 执行任务之前的一个事件 在JobEvent中并没有什么可执行的代码 $this-&gt;trigger(self::EVENT_BEFORE_EXECUTE, new JobEvent([&quot;job&quot; =&gt; $this, &amp;#39;payload&amp;#39; =&gt; $this-&gt;getPayload()])); $this-&gt;resolveAndFire();//真正执行的任务的方法 }</pre>

 /**   * 真正任务执行方法(调用hander的handle方法)   * @param  array $payload   * @return void */
    protected function resolveAndFire()
    {        $payload = $this->getPayload();
        $payload = unserialize($payload); //反序列化数据
        $type = $payload[&#39;type&#39;];
        $class = $payload[&#39;job&#39;];

        if ($type == &#39;closure&#39; && ($closure = (new Serializer())->unserialize($class[1])) instanceof \Closure) {            
            $this->handler = $this->getHander($class[0]);
            $this->handler->closure = $closure;
            $this->handler->handle($this, $payload[&#39;data&#39;]);
        } else if ($type == &#39;classMethod&#39;) {            $payload[&#39;job&#39;][0]->$payload[&#39;job&#39;][1]($this, $payload[&#39;data&#39;]);
        } else if ($type == &#39;staticMethod&#39;) {            $payload[&#39;job&#39;][0]::$payload[&#39;job&#39;][1]($this, $payload[&#39;data&#39;]);
        } else {//执行的`SendMail`类的`handle($job,$data)`方法
            $this->handler = $this->getHander($class);
            $this->handler->handle($this, $payload[&#39;data&#39;]);
        }        //执行完任务后删除
        if (!$this->isDeletedOrReleased()) {            $this->delete();
        }
    }

Finally comes to the

handle($job,$data)

of the executed
SendMail

class. Here are the objects and data pushed to the queue, followed by our processing logic. .

<pre class="brush:php;toolbar:false;">public function handle($job,$data) { if($job-&gt;getAttempts() &gt; 3){ $this-&gt;failed($job); } $payload = $job-&gt;getPayload(); echo &amp;#39;&lt;pre class=&quot;brush:php;toolbar:false&quot;&gt;&amp;#39;;print_r($payload); //$payload即任务的数据,你拿到任务数据后就可以执行发邮件了 //TODO 发邮件 }</pre>

The above is the detailed content of Introduction to PHPyii2 queue shmilyzxt/yii2-queue. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn