Home >Backend Development >PHP Tutorial >php接收rabbitMQ消息并执行繁重任务

php接收rabbitMQ消息并执行繁重任务

WBOY
WBOYOriginal
2016-06-23 13:07:031131browse

1) 建立消息队列基础类

<?php/** * @desc 消息队列 * @author caifangjie * @date 2016/05/03 */class Queue{    //交换机名称    protected $_exchangeName = 'ex_auto_home';        //队列名称    protected $_queueName = 'qu_auto_home';        //路由    protected $_routeKey = 'ru_auto_home';        protected $_connectHandler;        protected $_channelObject;    protected $_exchangeObject;        protected $_queueObject;        //配置信息    protected $_config = array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest');    //构造函数,依次创建通道,交换机,队列    public function __construct()    {        try{            $this->_connectHandler = new AMQPConnection($this->_config);            if(!$this->_connectHandler->connect()) {                die('connect failed');            }            $this->createChannel();            $this->createExchange();            $this->createQueue();        } catch(Exception $e) {            echo $e->getMessage();        }    }        //创建通道    protected function createChannel()    {        $this->_channelObject = new AMQPChannel($this->_connectHandler);    }        //创建交换机    public function createExchange($exchangeName='', $exchangeType=AMQP_EX_TYPE_DIRECT)    {        $exName = $exchangeName?$exchangeName:$this->_exchangeName;        $this->_exchangeObject = new AMQPExchange($this->_channelObject);        $this->_exchangeObject->setName($exName);        $this->_exchangeObject->setType($exchangeType);        $this->_exchangeObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_exchangeObject->declareExchange();    }        //创建队列    public function createQueue()    {        $this->_queueObject = new AMQPQueue($this->_channelObject);        $this->_queueObject->setName($this->_queueName);        $this->_queueObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_queueObject->declareQueue();        $this->_queueObject->bind($this->_exchangeObject->getName(), $this->_routeKey);    }  }


2)有一个任务,会连续向队列中推送消息,累计起来,队列中会有大量的消息.....

3)客户端连续的接受队列中的消息,并执行相应的任务

<?phprequire_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{    public function __construct()    {        parent::__construct();    }    //接受消息    public function recvMessage()    {        while (true) {            $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                   // var_dump($requestUrl);                    $execHandler = new ExecProcess();                    $execHandler->start($requestUrl);                    $execHandler->execSave();                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();


已知 require_once 'ExecProcess.class.php'; 这个类是没有问题的,单独执行可以通过,但是加到消息队列的客户端,接收消息,并执行一个繁重任务时,注:(php-cli模式下)执行时,客户端直接退出,无报错。

如果像下面这样时,则是可以正常运行,并打印队列中的消息的

<?php//require_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{    public function __construct()    {        parent::__construct();    }    //接受消息    public function recvMessage()    {        while (true) {            $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                    var_dump($requestUrl);//                    $execHandler = new ExecProcess();//                    $execHandler->start($requestUrl);//                    $execHandler->execSave();//                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();


也就是把ExecProcess.class.php'加进来时,接收消息的客户端,会自动退出,而且不会报错。。。

相反,去掉require_once 'ExecProcess.class.php';并把处理消息的逻辑去掉,是可以把队列中的消息打印出来的.....不知道是什么鬼

因为买的书,还没来得及看。

问题因该是很明显的,谁能给我一个思路,或者提示? 3Q


回复讨论(解决方案)

我觉得我很忧伤......这是要沉贴,翻船的节奏吗?

ExecProcess是不是出问题了什么

去掉require_once 'ExecProcess.class.php';并把处理消息的逻辑去掉,是可以把队列中的消息打印出来的.....

看你描述,应该是ExecProcess.class.php中,处理消息的部分出问题了,重点检查这部分代码,看看是什么异常。

执行繁重任务才出错,
可以检查是否执行超时导致。

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn