>  기사  >  백엔드 개발  >  Rabbitmq 공통 함수 캡슐화(php 버전)

Rabbitmq 공통 함수 캡슐화(php 버전)

藏色散人
藏色散人앞으로
2021-04-12 11:44:333257검색

이 글은 모든 사람에게 RabbitMQ 공통 기능 캡슐화(PHP 버전)를 소개합니다. 필요한 친구는 모두에게 도움이 되기를 바랍니다.

rabbitmq가 프로젝트에서 널리 사용되었을 때, 다음은 Rabbitmq의 일반적인 기능을 간단하게 요약하고, 작곡가 패키지로 캡슐화한 것입니다. 작곡가 패키지 주소(https://packagist.org/packages/maweibinguo/easyrabbitmq), github 주소(https://github.com/maweibinguo/easyrabbitmq), 포크하셔도 좋습니다. 레벨 제한으로 인해 버그가 있을 수 있으니 소중한 의견 부탁드립니다

[추천 학습: PHP 비디오 튜토리얼]

easy-rabbitmq 패키지 소개

php-amqplib/php-amqplib 패키지의 2차 캡슐화로, 공통 기능을 위한 즉시 사용 가능한 제작 솔루션 세트를 제공합니다
. 현재 지원되는 기능 목록은 다음과 같습니다.

  • 직접 스위치에 메시지 푸시(지연 메시지 포함)
  • 섹터 스위치에 메시지 푸시(지연 메시지 포함)
  • 주제 스위치에 메시지 푸시(지연 메시지 포함)
  • Under 구독 모드 안정적인 소비, 소비자는 실패 후에도 최대 5번까지 소비를 계속하려고 시도합니다.
  • 풀 모드의 안정적인 소비, 소비자는 실패 후에도 최대 5번까지 소비를 계속하려고 시도합니다.

다른 장면이 있다면 나중에 계속 추가해서 반복해주세요! !

Requirements

  • 설치 패키지의 PHP 버전 요구 사항은 주로 php-amqplib/php-amqplib 패키지 자체의 요구 사항에 따라 달라집니다. php5.0 사용자를 고려하기 위해 php-amqplib를 사용합니다. /php-amqplib 패키지 V2.9.0 버전.
    구체적인 요구 사항은 여기(https://packagist.org/packages/php-amqplib/php-amqplib#v2.9.0)를 참조하세요.
    하지만 저자는 php7.0 이상 사용을 권장합니다. 이 개발 패키지도 버전 7.0에서 개발되었습니다!

Installation

      composer require maweibinguo/easyrabbitmq

Use

여기서는 소비 프로세스의 신뢰성을 보장하고 작업자의 소비 능력을 향상시키기 위해 PHP 스크립트 + 감독자의 조합을 권장합니다! Supervisord에 대해 들어본 적이 없다면 여기(http://www.supervisord.org/introduction.html)를 클릭하여 자세히 알아볼 수 있습니다.

1. 푸시 메시지

1-1. 연결된 스위치

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToDirect(
                        $msg = time(), //消息体内容
                        $exchange = "easy_direct_exchange", //交换机名称
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
      );

1-2. 섹터 스위치에 메시지 푸시

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange", //交换机名称
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToFanout(
                        $msg = time(), //消息体内容
                        $exchange = "easy_fanout_exchange" //交换机名称
      );

1-3. 메시지를 주제 스위치에 푸시합니다

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延迟消息,30 秒中后才会到达指定的交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        /**
                         * routingKey 要同consum(get)方法的bindingKey相匹配
                         * bindingKey支持两种特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一个单词、“#”用于匹配多个单词(也可以是0个)
                         * 无论是bindingKey还是routingKey, 被"."分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 包含三个单词easy、topic、queue
                         */
                        $routingKey = "easy.topic.queue",
                        $delaySec = 30 //延迟秒数
      );

      //无延迟,推入到指定到直链交换机
      $instance->pushToTopic(
                        $msg = time(), //消息体内容
                        $exchange = "easy_topic_exchange", //交换机名称
                        $routingKey = "easy.topic.queue"    
      );

2. 소비는 최대 5번의 재시도를 지원합니다. 소비가 실패할 때마다 메시지가 전송되어 소비 대기열에 다시 들어갑니다. 이 클라이언트에서 지원하는 재배치 전략은 다음과 같습니다.

실패 1회(1초 후에 다시 전달됨), 실패 2회(2초 후에 다시 전달됨) 초) ), 3번 실패(4초 후에 다시 전달됨), 4번 실패(8초 후에 다시 전달됨), 5번 실패(16초 후에 다시 전달됨)


2-1.

구독 모드에서 안정적인 소비

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->consume(
            $queueName = "direct_test_queue",//订阅的队列名称
            $consumerTag = "c1",//消费标记
            $exchange = "easy_direct_exchange",//交换机名称
            $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机需要同routingKey保持一致
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = "easymq@failed"
      );
2-2, 풀 모드

풀 모드에서 안정적인 소비

      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->get(
            $queue = "get_queue",
            $exchange = "easy_fanout_exchange",
            $bindingKey = "",
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回结果不绝对等于(===)true,那么将触发消息重试机制
                return false;
            },
            //5次消费消费失败后,失败消息将会投递到的队列名称
            $failedQueue = 'easymq@failed'
      );

위 내용은 Rabbitmq 공통 함수 캡슐화(php 버전)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제