ホームページ  >  記事  >  バックエンド開発  >  mixphp を使用してマルチプロセスの非同期メール送信を作成する方法を説明します

mixphp を使用してマルチプロセスの非同期メール送信を作成する方法を説明します

藏色散人
藏色散人転載
2020-08-20 13:25:112561ブラウズ
注: これは MixPHP V1 の例です

電子メールの送信は非常に一般的な要件です。電子メールの送信操作は一般に時間がかかるため、ユーザー エクスペリエンスを向上させるために通常は非同期処理を使用します。 . を非同期的に実現するには、通常、メッセージ キューを使用します。

従来の MVC フレームワークでは、マルチプロセス開発機能が不足しているため、同じスクリプトを複数回実行して複数のプロセスを生成することが一般的でしたが、Mixphp はマルチプロセス開発専用に TaskExecutor をカプセル化しており、ユーザーは完全に機能し、可用性の高いマルチプロセス アプリケーションです。

推奨: 「PHP ビデオ チュートリアル

以下は、ナレッジ ポイントを含む非同期電子メール送信システムの開発プロセスを示しています:

  • 非同期
  • メッセージ キュー
  • 複数のプロセス
  • デーモン プロセス

メッセージ キューを使用して非同期を実装する方法

PHP通常はメッセージ キューを使用します ミドルウェアを使用して実装されます。一般的に使用されるメッセージ ミドルウェアは次のとおりです:

  • redis
  • rabbitmq
  • kafka

This非同期メール送信を実装するために redis を選択します。redis のデータ型にはメッセージキューを実装できるリスト型があります。次のコマンドを使用します:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

アーキテクチャ設計

この例 従来の MVC フレームワークは電子メール送信要件を提供し、MixPHP マルチプロセスは送信タスクを実行します。

メール送信ライブラリの選択

これまでは、フレームワークが提供するメール送信ライブラリを使用するか、オンラインで他のユーザーが共有しているライブラリをダウンロードするのが一般的でした。作曲家が登場しました。 https: //packagist.org/ には高品質のライブラリがたくさんあります。必要なのは、最適なもの (この場合は swiftmailer) を選択することだけです。

送信タスクはMixPHPで実行されるため、MixPHPプロジェクトにswiftmailerがインストールされますので、プロジェクトのルートディレクトリに以下のコマンドを実行してインストールしてください。 #Production Developer 開発

電子メール送信の要件において、プロデューサーとは、送信タスクを配信するパーティを指します。このパーティは、通常、インターフェイスまたは Web ページです。この部分は、必ずしも mixphp 開発を必要とするわけではありません、TP、CI、YII これらはすべて可能で、インターフェイスまたは Web ページのメッセージ キューにタスク情報をポストするだけです。

従来の MVC フレームワークのコントローラーに次のコードを追加します:

通常、フレームワークで redis を使用すると、使用するクラス ライブラリがインストールされます。この例では、理解しやすいようにネイティブ コードを使用しています。

composer require swiftmailer/swiftmailer

通常、非同期開発では配信完了後すぐにユーザーにメッセージが返信されますが、当然この時点ではタスクは実行されません。

コンシューマ開発

この例では、MixPHP のマルチプロセス開発ツール TaskExecutor を使用してこの要件を完了します。常駐プロセスは通常、キューの消費を処理するために使用されるため、 TaskExecutor の TYPE_DAEMON タイプと MODE_PUSH モード。

TaskExecutor の MODE_PUSH モードには 2 つのプロセスがあります。

左プロセス: メッセージ キューからタスク データを取り出し、それを中間プロセスに入れる役割を担います。

  • 中プロセス: 電子メール送信タスクの実行を担当します。

  • PushCommand.php コードは次のとおりです:

    // 连接
    $redis = new \Redis();
    if (!$redis->connect('127.0.0.1', 6379)) {
        throw new \Exception('Redis Connect Failure');
    }
    $redis->auth('');
    $redis->select(0);
    // 投递任务
    $data = [
        'to'      => ['***@qq.com' => 'A name'],
        'body'    => 'Here is the message itself',
        'subject' => 'The title content',
    ];
    $redis->lpush('queue:email', serialize($data));

  • Test

1. シェルでプッシュ常駐プログラムを開始します。

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;

/**
 * 推送模式范例
 * @author 刘健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = &#39;smtpdm.aliyun.com&#39;;
    const PORT = 465;
    const SECURITY = &#39;ssl&#39;;
    const USERNAME = &#39;****@email.***.com&#39;;
    const PASSWORD = &#39;****&#39;;

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 获取程序名称
        $this->programName = Input::getCommandName();
        // 设置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 获取服务
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 类路径
                &#39;class&#39;         => &#39;mix\task\TaskExecutor&#39;,
                // 服务名称
                &#39;name&#39;          => "mix-daemon: {$this->programName}",
                // 执行类型
                &#39;type&#39;          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 执行模式
                &#39;mode&#39;          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左进程数
                &#39;leftProcess&#39;   => 1,
                // 中进程数
                &#39;centerProcess&#39; => 5,
                // 任务超时时间 (秒)
                &#39;timeout&#39;       => 5,
            ]
        );
    }

    // 启动
    public function actionStart()
    {
        // 预处理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 启动服务
        $service = $this->getTaskService();
        $service->on(&#39;LeftStart&#39;, [$this, &#39;onLeftStart&#39;]);
        $service->on(&#39;CenterStart&#39;, [$this, &#39;onCenterStart&#39;]);
        $service->start();
        // 返回退出码
        return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
            $queueModel = Redis::getInstance();
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 从消息队列中间件阻塞获取一条消息
                $data = $queueModel->brpop(&#39;queue:email&#39;, 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一会,避免 CPU 出现 100%
            sleep(1);
            // 抛出错误
            throw $e;
        }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 从进程消息队列中抢占一条消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 处理消息
            try {
                // 处理消息,比如:发送短信、发送邮件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退数据到消息队列
                $worker->rollback($data);
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data[&#39;subject&#39;]))
            ->setFrom([self::USERNAME => &#39;**网&#39;])
            ->setTo($data[&#39;to&#39;])
            ->setBody($data[&#39;body&#39;]);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}
1. インターフェイスを呼び出して、タスクをメッセージ キューに入れます。

この時点で、シェル端末は次のように出力します:

テスト電子メールを正常に受信しました: mixphp を使用してマルチプロセスの非同期メール送信を作成する方法を説明します

mixphp を使用してマルチプロセスの非同期メール送信を作成する方法を説明しますMixPHP

GitHub: https://github.com/mix-php/mix

公式 Web サイト: http: //www.mixphp.cn/

以上がmixphp を使用してマルチプロセスの非同期メール送信を作成する方法を説明しますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はsegmentfault.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。