Was dieser Artikel Ihnen bringt, ist der vollständige Code für die Implementierung von Nachrichtenwarteschlangen in PHP und RabbitMQ. Ich hoffe, dass er Ihnen als Referenz dienen wird.

Installieren Sie zuerst RabbitMQ entsprechend PHP. Hier verwenden wir php_amqp. Verschiedene Erweiterungsimplementierungen weisen subtile Unterschiede auf.
PHP-Erweiterungsadresse: http://pecl.php.net/package/amqp
Für Einzelheiten finden Sie auf der offiziellen Website http://www.rabbitmq.com/getstarted.html


BaseMQ.php MQ-Basisklasse
ProductMQ.php Producer-Klasse
ConsumerMQ.php Consumer-Klasse
Consumer2MQ.php Consumer 2 (kann mehrere haben)

    return [
        &#39;host&#39; => [
            'host' => '',
            'port' => '5672',
            'login' => 'guest',
            'password' => 'guest',
        'routes' => [],
     * Created by PhpStorm.
     * User: pc
     * Date: 2018/12/13
     * Time: 14:11
    namespace MyObjSummary\rabbitMQ;
    /** Member
     *      AMQPChannel
     *      AMQPConnection
     *      AMQPEnvelope
     *      AMQPExchange
     *      AMQPQueue
     * Class BaseMQ
     * @package MyObjSummary\rabbitMQ
    class BaseMQ
        /** MQ Channel
         * @var \AMQPChannel
        public $AMQPChannel ;
        /** MQ Link
         * @var \AMQPConnection
        public $AMQPConnection ;
        /** MQ Envelope
         * @var \AMQPEnvelope
        public $AMQPEnvelope ;
        /** MQ Exchange
         * @var \AMQPExchange
        public $AMQPExchange ;
        /** MQ Queue
         * @var \AMQPQueue
        public $AMQPQueue ;
        /** conf
         * @var
        public $conf ;
        /** exchange
         * @var
        public $exchange ;
        /** link
         * BaseMQ constructor.
         * @throws \AMQPConnectionException
        public function __construct()
            $conf =  require &#39;config.php&#39; ;
                throw new \AMQPConnectionException(&#39;config error!&#39;);
            $this->conf     = $conf['host'] ;
            $this->exchange = $conf['exchange'] ;
            $this->AMQPConnection = new \AMQPConnection($this->conf);
            if (!$this->AMQPConnection->connect())
                throw new \AMQPConnectionException("Cannot connect to the broker!\n");
         * close link
        public function close()
        /** Channel
         * @return \AMQPChannel
         * @throws \AMQPConnectionException
        public function channel()
            if(!$this->AMQPChannel) {
                $this->AMQPChannel =  new \AMQPChannel($this->AMQPConnection);
            return $this->AMQPChannel;
        /** Exchange
         * @return \AMQPExchange
         * @throws \AMQPConnectionException
         * @throws \AMQPExchangeException
        public function exchange()
            if(!$this->AMQPExchange) {
                $this->AMQPExchange = new \AMQPExchange($this->channel());
            return $this->AMQPExchange ;
        /** queue
         * @return \AMQPQueue
         * @throws \AMQPConnectionException
         * @throws \AMQPQueueException
        public function queue()
            if(!$this->AMQPQueue) {
                $this->AMQPQueue = new \AMQPQueue($this->channel());
            return $this->AMQPQueue ;
        /** Envelope
         * @return \AMQPEnvelope
        public function envelope()
            if(!$this->AMQPEnvelope) {
                $this->AMQPEnvelope = new \AMQPEnvelope();
            return $this->AMQPEnvelope;


    //生产者 P
    namespace MyObjSummary\rabbitMQ;
    require &#39;BaseMQ.php&#39;;
    class ProductMQ extends BaseMQ
        private $routes = [&#39;hello&#39;,&#39;word&#39;]; //路由key
         * ProductMQ constructor.
         * @throws \AMQPConnectionException
        public function __construct()
        /** 只控制发送成功 不接受消费者是否收到
         * @throws \AMQPChannelException
         * @throws \AMQPConnectionException
         * @throws \AMQPExchangeException
        public function run()
            $channel = $this->channel();
            $ex = $this->exchange();
            $message = 'product message '.rand(1,99999);
            $sendEd = true ;
            foreach ($this->routes as $route) {
                $sendEd = $ex->publish($message, $route) ;
                echo "Send Message:".$sendEd."\n";
            if(!$sendEd) {
            $channel->commitTransaction(); //提交事务
            die ;
        (new ProductMQ())->run();
    }catch (\Exception $exception){
        var_dump($exception->getMessage()) ;
    //消费者 C
    namespace MyObjSummary\rabbitMQ;
    require &#39;BaseMQ.php&#39;;
    class ConsumerMQ extends BaseMQ
        private  $q_name = &#39;hello&#39;; //队列名
        private  $route  = &#39;hello&#39;; //路由key
         * ConsumerMQ constructor.
         * @throws \AMQPConnectionException
        public function __construct()
        /** 接受消息 如果终止 重连时会有消息
         * @throws \AMQPChannelException
         * @throws \AMQPConnectionException
         * @throws \AMQPExchangeException
         * @throws \AMQPQueueException
        public function run()
            $ex = $this->exchange();
            $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
            $ex->setFlags(AMQP_DURABLE); //持久化
            //echo "Exchange Status:".$ex->declare()."\n";
            $q = $this->queue();
            $q->setFlags(AMQP_DURABLE); //持久化
            //echo "Message Total:".$q->declareQueue()."\n";
            echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
            echo "Message:\n";
                $q->consume(function ($envelope,$queue){
                    $msg = $envelope->getBody();
                    echo $msg."\n"; //处理消息
                    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
                //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
        (new ConsumerMQ)->run();
    }catch (\Exception $exception){
        var_dump($exception->getMessage()) ;

