首頁  >  文章  >  後端開發  >  詳解PHP訊息佇列的實作以及運用(附流程圖)

詳解PHP訊息佇列的實作以及運用(附流程圖)

藏色散人
藏色散人轉載
2022-10-27 16:00:095292瀏覽

訊息佇列的概念、原則、實作方式

#概念

  • 佇列結構的一個中間件
  • 不需要立即消費訊息
  • 由消費者或訂閱者進行依序消費

基本的流程圖如下所示

  • 流程
    詳解PHP訊息佇列的實作以及運用(附流程圖)

應用場景

  • 冗餘
  • 解耦
  • 流量削峰
  • 非同步通訊

#實作方式

  • mysql:可靠、速度慢
  • redis:速度快,對於大訊息包處理較慢
  • 訊息系統:可靠、專業性強

##訊息的觸發機制

    死循環的方式,故障時無法及時恢復
  • 定時任務:壓力均分、但是處理量有上限
  • 守護程式的方式

解耦(訂單與配送系統)

  • #架構設計1 採用定時任務的方式


    詳解PHP訊息佇列的實作以及運用(附流程圖)

  • 使用配送處理系統進行處理時,將目前資料庫中需要處理的訂單狀態更新為2,待處理完成後將狀態設為1

  • 可以每次指定更新多少條資料

流量削鋒(redis實作秒殺)

  • 使用佇列的數據結構

      lpush/rpush 將資料放入清單中
    • lpop/rpop 將資料移除清單並取得到移除的值
    • ltrim 保留指定區間內的元素
    • llen 取得清單長度
    • lset 透過索引設定清單的值
    • lindex 透過索引取得清單中的值
    • lrange 取得指定範圍的元素
  • 圖示如下


    詳解PHP訊息佇列的實作以及運用(附流程圖)

  • 程式碼流程如下

    • #秒殺程式將請求寫入redis(uid,time)

    • 檢查redis清單存放的長度,超過10個直接捨棄

    • 透過死循環讀取redis數據,並存入資料庫

      // Spike.php 秒杀程序if(Redis::llen('lottery') <pre class="brush:php;toolbar:false">// Warehousing.php 入库程序while(true){
          $user = Redis::rpop('lottery');
          if (!$user || $user == 'nil') {
              sleep(2);
              continue;
          }
          $user_arr = explode($user, '%');
          $insert_user = [
              'uid' => $user_arr[0],
              'time' => $user_arr[1]
          ];
          $res = DB::table('lottery_queue')->insert($insert_user);
          if (!$res) {
              Redis::lpush('lottery', $user);
          }}
  • #上述程式碼中假如並發過大的話會存在超賣的情況,此時可以使用檔案鎖定或redis分散式鎖定進行控制,先將商品放入redis list中使用rpop進行取出,如果取不到則表示已經賣完

  • 具體的思路及偽程式碼如下

      // 先将商品放入redis中
      $goods_id = 2;
    
      $sql = select id,num from goods where id = $goods_id;
      $res = DB::select($sql);
      if (!empty($res)) {
          // 也可以指定多少件
          Redis::del('lottery_goods' . $goods_id);
          for($i=0;$i<pre class="brush:php;toolbar:false">  // 开始秒杀
      $count = Redis::rpop('lottery_goods' . $goods_id);
      if (!$count) {
          // 商品已抢完
          ...
      }
    
      // 用户抢购队列
      $user_list = 'user_goods_id_' . $goods_id;
      $user_status = Redis::sismember($user_list, $user_id);
      if ($user_status) {
          // 已抢过
          ...
      }
    
      // 将抢到的放到列表中
      Redis::sadd($user_list, $uid);
      $msg = '用户:' . $uid . '顺序' . $count;
      Log::info($msg);
      // 生成订单等
      ...
      // 减库存
      $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超卖
      DB::update($sql)
      // 抢购成功

rabbitmq

  • 架構及原理


    ##其中P代表生產者,X為交換器(channal),C代表消費者詳解PHP訊息佇列的實作以及運用(附流程圖)

  • 簡單使用
  •   // Send.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      // 创建通道
      $channel = $connection->channel();
      // 声明一个队列
      $channel->queue_declare('user_email', false, false, false, false);
      // 制作消息
      $msg = new AMQPMessage('send email');
      // 将消息推送到队列
      $channel->basic_publish($msg, '', 'user_email');
    
      echo '[x] send email';
    
      $channel->close();
      $connection->close();
      // Receive.php
      require_once __DIR__.'/vendor/autoload.php';
    
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
    
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    
      //创建通道
      $channel = $connection->channel();
    
      $channel->queue_declare('user_email', false, false, false, false);
    
      // 当收到消息时的回调函数
      $callback = function($msg){
          //发送邮件
          echo 'Received '.$msg->body.'\n';
      };
    
      $channel->basic_consume('user_email', '', false, true, false, false, $callback);
    
      // 保持监听状态
      while($channel->is_open()){
          $channel->wait();
      }

以上是詳解PHP訊息佇列的實作以及運用(附流程圖)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:learnku.com。如有侵權,請聯絡admin@php.cn刪除