Home  >  Article  >  Backend Development  >  rabbitmq common function encapsulation (php version)

rabbitmq common function encapsulation (php version)

藏色散人
藏色散人forward
2021-04-12 11:44:333255browse

This article introduces rabbitmq common function encapsulation (php version) to everyone. It has certain reference value. Friends who need it can refer to it. I hope it will be helpful to everyone.

When rabbitmq has been widely used in the project, here is a simple summary of the general functions of rabbitmq, and encapsulated into a composer package, the composer package address (https://packagist.org/packages/ maweibinguo/easyrabbitmq), github address (https://github.com/maweibinguo/easyrabbitmq), you are welcome to fork. Due to the limited level, bugs are inevitable, and your valuable opinions are welcome

[Recommended learning: PHP Video tutorial

easy-rabbitmq package introduction

Secondary encapsulation of the php-amqplib/php-amqplib package, providing a set of common functions that can be used out of the box Production solution
. The currently supported function list is as follows:

  • Push messages to directly connected switches (including delayed messages)
  • Push messages to sector switches (including delayed messages)
  • Push Messages to the topic switch (including delayed messages)
  • Reliable consumption in subscription mode. After the consumer fails to consume, it will try to continue consuming, up to 5 attempts.
  • Reliable consumption in pull mode, the consumer will try to continue consumption after failure, up to 5 attempts.

If there are other scenarios, please continue to add them and iterate later! !

Requirements

  • The PHP version requirements of the installation package mainly depend on the requirements of the php-amqplib/php-amqplib package itself. In order to take into account the users of php5.0, we use The version of php-amqplib/php-amqplib package V2.9.0 has been installed.
    For specific requirements, please refer here (https://packagist.org/packages/php-amqplib/php-amqplib#v2.9.0).
    However, the author recommends using php7.0 and above. This development package is also developed on version 7.0!

Installation

      composer require maweibinguo/easyrabbitmq

Use

Here we recommend using the php script supervisor in combination to ensure the reliability of the consumption process and enhance the consumption ability of the worker! If you haven’t heard of supervisor, you can click here (http://www.supervisord.org/introduction.html) to learn more.

1. Push message

1-1. Push Message to directly connected switch

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
      );

1-2, push message to sector switch

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange", //交换机名称
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange" //交换机名称
      );

1-3, push message to topic switch

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        /**
                         * routingKey 要同consum(get)方法的bindingKey相匹配
                         * bindingKey支持两种特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一个单词、“#”用于匹配多个单词(也可以是0个)
                         * 无论是bindingKey还是routingKey, 被"."分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 包含三个单词easy、topic、queue
                         */
                        $routingKey = "easy.topic.queue",
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        $routingKey = "easy.topic.queue"    
      );

2, consume message

Consumption supports automatic retry, with a maximum of 5 retries. After each consumption failure, the message will be put back into the consumption queue. The retry time will gradually pass as the number of failures increases. The delay strategy supported by this client is as follows:
Failure 1 time (it will be delivered again after 1 second), failure 2 times (it will be delivered again after 2 seconds) Delivered), failed 3 times (will be delivered again after 4 seconds), failed 4 times (will be delivered again after 8 seconds), failed 5 times (will be delivered again after 16 seconds)

2-1. Subscription mode

Reliable consumption under subscription mode
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->consume(
            $queueName = "direct_test_queue",//订阅的队列名称
            $consumerTag = "c1",//消费标记
            $exchange = "easy_direct_exchange",//交换机名称
            $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机需要同routingKey保持一致
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = "easymq@failed"
      );

2-2. Pull mode

Reliable consumption under pull mode
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->get(
            $queue = "get_queue",
            $exchange = "easy_fanout_exchange",
            $bindingKey = "",
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = 'easymq@failed'
      );

The above is the detailed content of rabbitmq common function encapsulation (php version). For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:segmentfault.com. If there is any infringement, please contact admin@php.cn delete