Home  >  Article  >  Backend Development  >  How to use the simple and reliable rabbitmq component package in php

How to use the simple and reliable rabbitmq component package in php

醉折花枝作酒筹
醉折花枝作酒筹forward
2021-07-02 15:33:182219browse

When rabbitmq has been widely used in the project, here is a simple summary of rabbitmq's regular functions, and encapsulated into a composer package, composer package address, github address, forks are welcome, due to limited level, there are inevitably bugs , welcome your valuable opinions.

How to use the simple and reliable rabbitmq component package in php

easy-rabbitmq package introduction

Secondary encapsulation of the php-amqplib/php-amqplib package, providing a set of common functions that can be used out of the box production solutions. The currently supported function list is as follows:

  • Push messages to directly connected switches (including delayed messages)

  • Push messages to sector switches (including delayed messages) Message)

  • Push messages to the topic switch (including delayed messages)

  • Reliable consumption in subscription mode, after consumer consumption fails Try to continue spending, up to 5 attempts.

  • Reliable consumption in pull mode, the consumer will try to continue consumption after failure, up to 5 attempts.

If there are other scenarios, please continue to add them and iterate later! !

Requirements

The PHP version requirements of the installation package mainly depend on the requirements of the php-amqplib/php-amqplib package itself. In order to take into account the users of php5.0, we use php- amqplib/php - version of amqplib package V2.9.0.

For specific requirements, please refer here.

However, the author recommends using php7.0 and above. This development kit is also developed on version 7.0!

Installation

      composer require maweibinguo/easyrabbitmq

Use

Here we recommend using the php script supervisor in combination to ensure the reliability of the consumption process and enhance the consumption ability of the worker! If you haven’t heard of supervisor, you can click here to learn about it.

1. Push message

1-1. Push message to directly connected switch

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
      );

1-2 , push the message to the sector switch

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange", //交换机名称
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange" //交换机名称
      );

1-3, push the message to the topic switch

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        /**
                         * routingKey 要同consum(get)方法的bindingKey相匹配
                         * bindingKey支持两种特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一个单词、“#”用于匹配多个单词(也可以是0个)
                         * 无论是bindingKey还是routingKey, 被"."分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 包含三个单词easy、topic、queue
                         */
                        $routingKey = "easy.topic.queue",
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        $routingKey = "easy.topic.queue"    
      );

2, consume the message

Consumption supports automatic retry, with a maximum of 5 retries. times, the message will be put back into the consumption queue after each consumption failure. The retry time will gradually pass as the number of failures increases. The delay strategies supported by this client are as follows:

Failure 1 time (will be delivered again after 1 second), failure 2 times (will be delivered again after 2 seconds) will be delivered again later), failed 3 times (will be delivered again after 4 seconds), failed 4 times (will be delivered again after 8 seconds), failed 5 times (will be delivered again after 16 seconds)

2-1. Subscription mode

Reliable consumption under subscription mode
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->consume(
            $queueName = "direct_test_queue",//订阅的队列名称
            $consumerTag = "c1",//消费标记
            $exchange = "easy_direct_exchange",//交换机名称
            $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机需要同routingKey保持一致
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = "easymq@failed"
      );

2-2. Pull mode

Reliable consumption under pull mode
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->get(
            $queue = "get_queue",
            $exchange = "easy_fanout_exchange",
            $bindingKey = "",
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = 'easymq@failed'
      );

Recommended learning: php video tutorial

The above is the detailed content of How to use the simple and reliable rabbitmq component package in php. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:segmentfault.com. If there is any infringement, please contact admin@php.cn delete