搜索
首页后端开发php教程使用 mixphp 打造多进程异步邮件发送

使用 mixphp 打造多进程异步邮件发送

Jul 06, 2018 pm 04:14 PM
emailphpprocessswoole

这篇文章主要介绍了关于使用 mixphp 打造多进程异步邮件发送,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

  • 异步

  • 消息队列

  • 多进程

  • 守护进程

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

  • redis

  • rabbitmq

  • kafka

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer

生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor  的 MODE_PUSH 模式有二种进程:

  • 左进程:负责从消息队列取出任务数据,投放给中进程。

  • 中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;

/**
 * 推送模式范例
 * @author 刘健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = &#39;smtpdm.aliyun.com&#39;;
    const PORT = 465;
    const SECURITY = &#39;ssl&#39;;
    const USERNAME = &#39;****@email.***.com&#39;;
    const PASSWORD = &#39;****&#39;;

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 获取程序名称
        $this->programName = Input::getCommandName();
        // 设置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 获取服务
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 类路径
                &#39;class&#39;         => &#39;mix\task\TaskExecutor&#39;,
                // 服务名称
                &#39;name&#39;          => "mix-daemon: {$this->programName}",
                // 执行类型
                &#39;type&#39;          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 执行模式
                &#39;mode&#39;          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左进程数
                &#39;leftProcess&#39;   => 1,
                // 中进程数
                &#39;centerProcess&#39; => 5,
                // 任务超时时间 (秒)
                &#39;timeout&#39;       => 5,
            ]
        );
    }

    // 启动
    public function actionStart()
    {
        // 预处理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 启动服务
        $service = $this->getTaskService();
        $service->on(&#39;LeftStart&#39;, [$this, &#39;onLeftStart&#39;]);
        $service->on(&#39;CenterStart&#39;, [$this, &#39;onCenterStart&#39;]);
        $service->start();
        // 返回退出码
        return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
            $queueModel = Redis::getInstance();
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 从消息队列中间件阻塞获取一条消息
                $data = $queueModel->brpop(&#39;queue:email&#39;, 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一会,避免 CPU 出现 100%
            sleep(1);
            // 抛出错误
            throw $e;
        }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 从进程消息队列中抢占一条消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 处理消息
            try {
                // 处理消息,比如:发送短信、发送邮件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退数据到消息队列
                $worker->rollback($data);
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data[&#39;subject&#39;]))
            ->setFrom([self::USERNAME => &#39;**网&#39;])
            ->setTo($data[&#39;to&#39;])
            ->setBody($data[&#39;body&#39;]);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}

测试

  1. 在 shell 中启动 push 常驻程序。

[root@localhost bin]# ./mix-daemon push start
mix-daemon 'push' start successed.
  1. 调用接口往消息队列投放任务。

此时 shell 终端将打印:

5710806-053c97487e5344ac[1].png

成功收到测试邮件:

5710806-6f6ce15dc9d77933[1].png

以上就是本文的全部内容,希望对大家的学习有所帮助,更多相关内容请关注PHP中文网!

相关推荐:

给PHP开启shmop扩展实现共享内存

php实现共享内存进程通信函数(_shm)

以上是使用 mixphp 打造多进程异步邮件发送的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
PHP如何识别用户的会话?PHP如何识别用户的会话?May 01, 2025 am 12:23 AM

phpientifiesauser'ssessionusessessionSessionCookiesAndSessionIds.1)whiwSession_start()被称为,phpgeneratesainiquesesesessionIdStoredInacookInAcookInamedInAcienamedphpsessidontheuser'sbrowser'sbrowser.2)thisIdAllowSphptptpptpptpptpptortoreTessessionDataAfromtheserverMtheserver。

确保PHP会议的一些最佳实践是什么?确保PHP会议的一些最佳实践是什么?May 01, 2025 am 12:22 AM

PHP会话的安全可以通过以下措施实现:1.使用session_regenerate_id()在用户登录或重要操作时重新生成会话ID。2.通过HTTPS协议加密传输会话ID。3.使用session_save_path()指定安全目录存储会话数据,并正确设置权限。

PHP会话文件默认存储在哪里?PHP会话文件默认存储在哪里?May 01, 2025 am 12:15 AM

phpsessionFilesArestoredIntheDirectorySpecifiedBysession.save_path,通常是/tmponunix-likesystemsorc:\ windows \ windows \ temponwindows.tocustomizethis:tocustomizEthis:1)useession_save_save_save_path_path()

您如何从PHP会话中检索数据?您如何从PHP会话中检索数据?May 01, 2025 am 12:11 AM

ToretrievedatafromaPHPsession,startthesessionwithsession_start()andaccessvariablesinthe$_SESSIONarray.Forexample:1)Startthesession:session_start().2)Retrievedata:$username=$_SESSION['username'];echo"Welcome,".$username;.Sessionsareserver-si

您如何使用会议来实施购物车?您如何使用会议来实施购物车?May 01, 2025 am 12:10 AM

利用会话构建高效购物车系统的步骤包括:1)理解会话的定义与作用,会话是服务器端的存储机制,用于跨请求维护用户状态;2)实现基本的会话管理,如添加商品到购物车;3)扩展到高级用法,支持商品数量管理和删除;4)优化性能和安全性,通过持久化会话数据和使用安全的会话标识符。

您如何在PHP中创建和使用接口?您如何在PHP中创建和使用接口?Apr 30, 2025 pm 03:40 PM

本文解释了如何创建,实施和使用PHP中的接口,重点关注其对代码组织和可维护性的好处。

crypt()和password_hash()有什么区别?crypt()和password_hash()有什么区别?Apr 30, 2025 pm 03:39 PM

本文讨论了PHP中的crypt()和password_hash()之间的差异,以进行密码哈希,重点介绍其实施,安全性和对现代Web应用程序的适用性。

如何防止PHP中的跨站点脚本(XSS)?如何防止PHP中的跨站点脚本(XSS)?Apr 30, 2025 pm 03:38 PM

文章讨论了通过输入验证,输出编码以及使用OWASP ESAPI和HTML净化器之类的工具来防止PHP中的跨站点脚本(XSS)。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

EditPlus 中文破解版

EditPlus 中文破解版

体积小,语法高亮,不支持代码提示功能

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )专业的PHP集成开发工具

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

安全考试浏览器

安全考试浏览器

Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境