首頁  >  文章  >  後端開發  >  rabbitmq常見功能封裝(php版本)

rabbitmq常見功能封裝(php版本)

藏色散人
藏色散人轉載
2021-04-12 11:44:333259瀏覽

這篇文章跟大家介紹rabbitmq常見功能封裝(php版本),有一定的參考價值,需要的朋友可以參考一下,希望對大家有幫助。

在專案中rabbitmq得到了廣泛的時候,這裡對rabbitmq的常規功能做了一個簡單的總結,並封裝成了composer包,composer包地址(https://packagist.org/packages/ maweibinguo/easyrabbitmq)、github地址(https://github.com/maweibinguo/easyrabbitmq),歡迎fork,由於水平有限,難免存在bug,歡迎提出寶貴意見

#【推薦學習:PHP影片教學

easy-rabbitmq 套件簡介

#對php-amqplib/php-amqplib套件的二次封裝,為常見功能提供一套開箱即用的生產解決方案
。目前支援的功能清單如下:

  • 推播訊息到直連交換器(含延遲訊息)
  • 推播訊息到扇形交換器(含延遲訊息)
  • 推送訊息到主題交換器(含延遲訊息)
  • 訂閱模式下的可靠消費, 消費者消費失敗後將會嘗試繼續消費,最多嘗試5次。
  • 拉取模式下的可靠消費, 消費者消費失敗後將會嘗試繼續消費,最多嘗試5次。

如果還有其它場景,歡迎繼續補充,隨後進行迭代! !

要求

  • 安裝套件對PHP版本對要求主要取決於php-amqplib/php-amqplib套件本身對要求,這裡為了兼顧php5.0的使用者,我們使用了php-amqplib/php-amqplib包V2.9.0的版本。
    具體的要求參考這裡(https://packagist.org/packages/php-amqplib/php-amqplib#v2.9.0)。
    不過筆者推薦使用php7.0及其以上版本, 這個開發包也是在7.0這個版本上面開發完成的!

安裝

      composer require maweibinguo/easyrabbitmq

使用

在這裡我們推薦php腳本 supervisor結合使用,以確保消費進程的可靠性、增強worker的消費能力!如果你還沒聽過supervisor,可以點這裡(http://www.supervisord.org/introduction.html)了解.

1、推播訊息

1-1、推播訊息到直連交換器

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
      );

1-2、推播訊息到扇形交換器

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange", //交换机名称
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange" //交换机名称
      );

1-3、推播訊息到主題交換器

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        /**
                         * routingKey 要同consum(get)方法的bindingKey相匹配
                         * bindingKey支持两种特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一个单词、“#”用于匹配多个单词(也可以是0个)
                         * 无论是bindingKey还是routingKey, 被"."分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 包含三个单词easy、topic、queue
                         */
                        $routingKey = "easy.topic.queue",
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        $routingKey = "easy.topic.queue"    
      );

2、消費訊息

消費支援自動重試,最多嘗試重試5次,每次消費失敗後訊息將會重新投入消費隊列。重新的時間將會隨著失敗的次數增多逐漸推移,本客戶端支援的推移策略如下:
失敗1次(1秒鐘後會再被投遞), 失敗2次(2秒鐘後會再被投遞), 失敗3次(4秒鐘後會再被投遞), 失敗4次(8秒鐘後會再被投遞), 失敗5次(16秒鐘後會再被投遞)

#2-1、訂閱模式

訂閱模式下的可靠消費
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->consume(
            $queueName = "direct_test_queue",//订阅的队列名称
            $consumerTag = "c1",//消费标记
            $exchange = "easy_direct_exchange",//交换机名称
            $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机需要同routingKey保持一致
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = "easymq@failed"
      );

2-2、拉取模式

拉取模式下的可靠消費
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->get(
            $queue = "get_queue",
            $exchange = "easy_fanout_exchange",
            $bindingKey = "",
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = 'easymq@failed'
      );

以上是rabbitmq常見功能封裝(php版本)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:segmentfault.com。如有侵權,請聯絡admin@php.cn刪除