首頁  >  文章  >  php教程  >  RibbetMQ php扩展使用 实现队列生产消费

RibbetMQ php扩展使用 实现队列生产消费

WBOY
WBOY原創
2016-06-06 19:38:211257瀏覽

RibbetMQphp扩展使用实现队列生产消费 无 一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器http://www.rabbitmq.com/download.html默认的端口是55672 访问地址http://127.0.0.

RibbetMQ php扩展使用 实现队列生产消费
一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:
首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器
http://www.rabbitmq.com/download.html
默认的端口是55672   访问地址http://127.0.0.1:55672/
默认帐号密码   guest    guest
你可以看到rabbitmq 的管理界面



mq的任务是一个不浪费资源,的一个队列系统!

        php使用需要下载一个amqp扩展
         或者直接点击下面的地址找到适合自己的版本,下载
            http://pecl.php.net/package/amqp/1.2.0/windows

             
            rabbitmq.1.dll   放在C盘windows下
            php_amqp.dll    放入php扩展中
            开启php_amqp.dll的引用
            重启服务器
用phpinfo();
查看是否引用成功,如果出现以下的amqp扩展,那就说明成功了


首先是rabbitmq的生产者:
    创建第一个index文件:然后去mq中查看,如果添加一个test001的队列名信息,就说明已经添加进去了,xx22的信息已经在mq中存储!
    接下来就需要跑数据了。
    createQueue(array('xxx','2222'),'test001');
    echo "ok";
     function createQueue($message,$queueName,$exchangeName = '', $queueKey = '')
    {
        $queueName = self::getQueueName($queueName);
        $conn_args = array('host' =>'localhost', 'port'=> '5672',
            'login' =>'guest',        //mq帐号
            'password'=> '',        //mq密码
             'vhost' => '/');
        $conn = new AMQPConnection($conn_args);
        $conn->connect();
        $channel = new AMQPChannel($conn);
        if (!$exchangeName) {
            $exchangeName = $queueName;
        }
        $queueName = $queueName;
        if (!$queueKey) {
            $queueKey = $queueName;
        }
        $ex = new AMQPExchange($channel);
        $ex->setName($exchangeName);
        $ex->setType(AMQP_EX_TYPE_TOPIC);
        $ex->setFlags(AMQP_DURABLE); //exchange持久化
        $ex->declareExchange();
        $q = new AMQPQueue($channel);
        $q->setName($queueName);
        $q->setFlags(AMQP_DURABLE); //queue持久化
        $q->declareQueue();
        $q->bind($exchangeName, $queueKey);
        $channel->startTransaction();
        /**
         * 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是设置消息的优先级,测试中发现并未起作用。
         * 消息还有其他属性,请参考http://www.php.net/manual/zh/amqpexchange.publish.php
         */
        $result = $ex->publish(json_encode($message), $queueKey, AMQP_NOPARAM, array('delivery_mode'=>2, 'priority'=> 9));
        $channel->commitTransaction();
        $conn->disconnect();
    }   

有了生产者,那就有消费者。
脚本如果没有其他的修改或问题,基本上都是常年启动的:

消费者基类:
        class WorkerCommand{
        function qInit($q_name,$e_name='',$k_route=''){
                $q_name = Utils::getQueueName($q_name);
                $conn_args = array(
                    'host' => '127.0.0.1',            //mq的配置
                    'port' => '5672',
                    'login' => 'guest',
                    'password' => 'huoxingxing',
                    'vhost' => '/'
                );
             
         
        //创建连接和channel
                $conn = new AMQPConnection($conn_args);
                if (!$conn->connect()) {
                    die("Cannot connect to the broker!\n");
                }
                $channel = new AMQPChannel($conn);
        //创建交换机
                $ex = new AMQPExchange($channel);
                if (!$e_name) {
                    $e_name = $q_name;
                }
                $ex->setName($e_name);
                $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
                $ex->setFlags(AMQP_DURABLE); //持久化
               // echo "Exchange Status:" . $ex->declareExchange() . "\n";
        //创建队列
                $q = new AMQPQueue($channel);
                $q->setName($q_name);
                $q->setFlags(AMQP_DURABLE); //持久化
               // echo "Message Total:" . $q->declareExchange() . "\n";
                if (!$k_route) {
                    $k_route = $q_name;
                }
        //绑定交换机与队列,并指定路由键
               // echo 'Queue Bind: ' . $q->declareQueue($e_name, $k_route) . "\n";
        //阻塞模式接收消息
                echo "Message:\n";
                while (True) {
                    $q->consume(array($this,'processMessage'));
                    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
                }
                $conn->disconnect();
        }
}     

消费者:
class WorkerWareSyncBackUpCommand extends WorkerCommand { 
    function actionIndex()
    {
        $this->qInit('SyncWareBackup');
    }
    function processMessage($envelope, $queue)
    {
        $msg = json_decode($envelope->getBody());
        Utils::doBackUp('back',$msg,'');
        $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    }
}     
陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn