ホームページ  >  記事  >  バックエンド開発  >  mixphp を使用してマルチプロセスの非同期メール送信を作成する

mixphp を使用してマルチプロセスの非同期メール送信を作成する

不言
不言オリジナル
2018-07-06 16:14:421800ブラウズ

この記事では、主に mixphp を使用して複数プロセスの非同期メール送信を作成する方法を紹介します。これには一定の参考値があります。今、あなたに共有します。必要な友人はそれを参照できます。

メール送信は非常に一般的な要件です。電子メールの送信操作には一般に時間がかかるため、ユーザー エクスペリエンスを向上させるために通常は非同期処理が使用され、非同期処理は通常、メッセージ キューを使用して実装されます。

マルチプロセス開発機能がないため、従来の MVC フレームワークでは通常、同じスクリプトを複数回実行して複数のプロセスを生成しますが、Mixphp はマルチプロセス開発専用に TaskExecutor をカプセル化しており、ユーザーは完全に機能し、可用性の高いマルチプロセス アプリケーションです。

次に、ナレッジ ポイントを含む非同期電子メール送信システムの開発プロセスを示します。

  • 非同期

  • メッセージ キュー

  • 複数のプロセス

  • デーモン プロセス

メッセージ キューを使用して非同期を実現する方法

PHP によるメッセージ キューの使用は、通常、ミドルウェアを使用して実装されます。一般的に使用されるメッセージ ミドルウェアは次のとおりです:

  • redis

  • rabbitmq

  • kafka

今回は非同期メール送信を実装するためにredisを選択します redisのデータ型にはリスト型があり、メッセージキューを実装することができます次のコマンドを使用します:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

アーキテクチャ デザイン

この例では、従来の MVC フレームワークを使用して電子メール送信要件を提供し、MixPHP マルチプロセスを使用して送信タスクを実行します。

メール送信ライブラリの選択

従来は、フレームワークが提供するメール送信ライブラリを使用するか、オンラインで他のユーザーが共有しているライブラリをダウンロードするのが一般的でしたが、composerの登場以降、https:/ /packagist.org/ インターネット上には高品質のライブラリがたくさんありますが、必要なのは最適なものを選択することだけですが、ここでは swiftmailer を選択します。

送信タスクはMixPHPで実行されるため、MixPHPプロジェクトにswiftmailerがインストールされますので、プロジェクトのルートディレクトリに以下のコマンドを実行してインストールしてください:

composer require swiftmailer/swiftmailer

Producerdevelopment

メール送信の要件において、プロデューサーとは、送信タスクを配信する側を指します。通常、この側はインターフェイスまたは Web ページです。この部分は、必ずしも mixphp で開発する必要はありません。TP、CI、YII はすべて、インターフェイスまたは Web ページに追加するだけで、タスク情報をメッセージ キューに投稿するだけで済みます。

従来の MVC フレームワークのコントローラーに次のコードを追加します:

通常、フレームワークで redis を使用すると、使用するクラス ライブラリがインストールされます。この例では、理解しやすいようにネイティブ コードを使用しています。
// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));

通常、非同期開発では配信完了後すぐにユーザーにメッセージが返信されますが、当然この時点ではタスクは実行されません。

コンシューマ開発

この例では、MixPHP のマルチプロセス開発ツール TaskExecutor を使用して、この要件を完了します。通常、キューの消費を処理するために常駐プロセスが使用されるため、TaskExecutor の TYPE_DAEMON タイプを使用します。 MODE_PUSH モード。

TaskExecutor の MODE_PUSH モードには 2 つのプロセスがあります。

  • 左プロセス: メッセージ キューからタスク データを取り出し、それを中間プロセスに入れる役割を担います。

  • 中プロセス: 電子メール送信タスクの実行を担当します。

PushCommand.php コードは次のとおりです。

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;

/**
 * 推送模式范例
 * @author 刘健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = &#39;smtpdm.aliyun.com&#39;;
    const PORT = 465;
    const SECURITY = &#39;ssl&#39;;
    const USERNAME = &#39;****@email.***.com&#39;;
    const PASSWORD = &#39;****&#39;;

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 获取程序名称
        $this->programName = Input::getCommandName();
        // 设置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 获取服务
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 类路径
                &#39;class&#39;         => &#39;mix\task\TaskExecutor&#39;,
                // 服务名称
                &#39;name&#39;          => "mix-daemon: {$this->programName}",
                // 执行类型
                &#39;type&#39;          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 执行模式
                &#39;mode&#39;          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左进程数
                &#39;leftProcess&#39;   => 1,
                // 中进程数
                &#39;centerProcess&#39; => 5,
                // 任务超时时间 (秒)
                &#39;timeout&#39;       => 5,
            ]
        );
    }

    // 启动
    public function actionStart()
    {
        // 预处理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 启动服务
        $service = $this->getTaskService();
        $service->on(&#39;LeftStart&#39;, [$this, &#39;onLeftStart&#39;]);
        $service->on(&#39;CenterStart&#39;, [$this, &#39;onCenterStart&#39;]);
        $service->start();
        // 返回退出码
        return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
            $queueModel = Redis::getInstance();
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 从消息队列中间件阻塞获取一条消息
                $data = $queueModel->brpop(&#39;queue:email&#39;, 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一会,避免 CPU 出现 100%
            sleep(1);
            // 抛出错误
            throw $e;
        }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 从进程消息队列中抢占一条消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 处理消息
            try {
                // 处理消息,比如:发送短信、发送邮件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退数据到消息队列
                $worker->rollback($data);
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data[&#39;subject&#39;]))
            ->setFrom([self::USERNAME => &#39;**网&#39;])
            ->setTo($data[&#39;to&#39;])
            ->setBody($data[&#39;body&#39;]);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}

Test

  1. シェルでプッシュ常駐プログラムを開始します。

[root@localhost bin]# ./mix-daemon push start
mix-daemon 'push' start successed.
  1. インターフェイスを呼び出してタスクをメッセージ キューに入れます。

この時点で、シェル端末は次のように出力します:

mixphp を使用してマルチプロセスの非同期メール送信を作成する

テスト電子メールを正常に受信しました:

mixphp を使用してマルチプロセスの非同期メール送信を作成する

#以上がこの記事の全内容です。皆様の学習に少しでもお役に立てれば幸いです。関連コンテンツの詳細については、PHP 中国語 Web サイトをご覧ください。

関連する推奨事項:

PHP の shmop 拡張機能を有効にして共有メモリを実装する

php は共有メモリ プロセス通信関数を実装します ( _shm )

以上がmixphp を使用してマルチプロセスの非同期メール送信を作成するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。