Home >Backend Development >PHP Tutorial >Detailed explanation of the implementation and application of PHP message queue (with flow chart)

Detailed explanation of the implementation and application of PHP message queue (with flow chart)

藏色散人
藏色散人forward
2022-10-27 16:00:095372browse

The concept, principle and implementation of message queue

Concept

  • A middleware for the queue structure
  • No need to consume messages immediately
  • Consumed by consumers or subscribers in sequence

The basic flow chart is as follows

  • Process
    Detailed explanation of the implementation and application of PHP message queue (with flow chart)

Application scenario

  • Redundancy
  • Decoupling
  • Traffic peak cutting
  • Asynchronous communication

Implementation method

  • mysql: reliable, slow
  • redis: fast, slow to process large message packets
  • Message system: reliable and professional

Message triggering mechanism

  • Infinite loop method, unable to recover in time in case of failure
  • Scheduled tasks: pressure equalization, but there is an upper limit on processing capacity
  • Daemon process method

Decoupling (order and delivery system)

  • Architecture design 1 using scheduled tasks
    Detailed explanation of the implementation and application of PHP message queue (with flow chart)

  • When using the distribution processing system for processing, update the status of the order to be processed in the current database to 2, and set the status to 1 after the processing is completed

  • You can specify how many pieces of data to update each time

Traffic reduction (redis implements flash sales)

  • Use queue data Structure

    • lpush/rpush Put the data into the list
    • lpop/rpop Remove the data from the list and get the removed value
    • ltrim Keep the specified interval Elements within
    • llen Get the length of the list
    • lset Set the value of the list by index
    • lindex Get the value in the list by index
    • lrange Get the specified range The elements
  • are illustrated as follows
    Detailed explanation of the implementation and application of PHP message queue (with flow chart)

  • ##The code flow is as follows

    • The flash kill program writes the request to redis(uid, time)

    • Check the length of the redis list storage, if it exceeds 10, it will be discarded directly

    • Read redis data through an infinite loop and store it in the database

      // Spike.php 秒杀程序if(Redis::llen('lottery') <pre class="brush:php;toolbar:false">// Warehousing.php 入库程序while(true){
          $user = Redis::rpop('lottery');
          if (!$user || $user == 'nil') {
              sleep(2);
              continue;
          }
          $user_arr = explode($user, '%');
          $insert_user = [
              'uid' => $user_arr[0],
              'time' => $user_arr[1]
          ];
          $res = DB::table('lottery_queue')->insert($insert_user);
          if (!$res) {
              Redis::lpush('lottery', $user);
          }}
  • If the concurrency in the above code is too large, there will be an oversold situation. At this time, you can Use file locks or redis distributed locks for control. First put the product into the redis list and use rpop to take it out. If you cannot get it, it means it has been sold out.

  • Specific ideas and pseudo The code is as follows

      // 先将商品放入redis中
      $goods_id = 2;
    
      $sql = select id,num from goods where id = $goods_id;
      $res = DB::select($sql);
      if (!empty($res)) {
          // 也可以指定多少件
          Redis::del('lottery_goods' . $goods_id);
          for($i=0;$i<pre class="brush:php;toolbar:false">  // 开始秒杀
      $count = Redis::rpop('lottery_goods' . $goods_id);
      if (!$count) {
          // 商品已抢完
          ...
      }
    
      // 用户抢购队列
      $user_list = 'user_goods_id_' . $goods_id;
      $user_status = Redis::sismember($user_list, $user_id);
      if ($user_status) {
          // 已抢过
          ...
      }
    
      // 将抢到的放到列表中
      Redis::sadd($user_list, $uid);
      $msg = '用户:' . $uid . '顺序' . $count;
      Log::info($msg);
      // 生成订单等
      ...
      // 减库存
      $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超卖
      DB::update($sql)
      // 抢购成功

##rabbitmq

    Architecture and Principle

  • where P represents production Or, X is the switch (channel), C represents the consumerDetailed explanation of the implementation and application of PHP message queue (with flow chart)

  • Simple use
  •   // Send.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      // 创建通道
      $channel = $connection->channel();
      // 声明一个队列
      $channel->queue_declare('user_email', false, false, false, false);
      // 制作消息
      $msg = new AMQPMessage('send email');
      // 将消息推送到队列
      $channel->basic_publish($msg, '', 'user_email');
    
      echo '[x] send email';
    
      $channel->close();
      $connection->close();
      // Receive.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      //创建通道
      $channel = $connection->channel();
    
      $channel->queue_declare('user_email', false, false, false, false);
    
      // 当收到消息时的回调函数
      $callback = function($msg){
          //发送邮件
          echo 'Received '.$msg->body.'\n';
      };
    
      $channel->basic_consume('user_email', '', false, true, false, false, $callback);
    
      // 保持监听状态
      while($channel->is_open()){
          $channel->wait();
      }

The above is the detailed content of Detailed explanation of the implementation and application of PHP message queue (with flow chart). For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:learnku.com. If there is any infringement, please contact admin@php.cn delete