搜索
首页后端开发php教程关于 Laravel Redis 多个进程同时取队列的问题详解

这篇文章主要给大家介绍了关于 Laravel Redis 多个进程同时取队列问题的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或工作具有一定的参考学习价值,需要的朋友下面来一起学习学习吧。

前言

最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。

使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
autostart=true
autorestart=true
numprocs=8
redirect_stderr=true
stdout_logfile=/var/www/xxx.cn/worker.log

注意: numprocs = 8,代表开启 8 个进程来执行 command 中的命令。

如下:

PS C:\Users\tanteng\website\laradock> docker-compose exec php-worker sh
/etc/supervisor/conf.d # ps -ef | grep php
 7 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 8 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 9 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 10 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 11 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 12 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 13 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 14 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 44 root  0:00 grep php

Laravel 多进程读取队列内容是否会重复

在 Laravel 的某个控制器方法,一次放入多个任务队列:

public function index(Request $request)
{
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
}

在队列处理的方法打印日志,打印处理的队列的 ID:

app/Jobs/SendFile3.php

public function handle()
{
 info('invoke SendFile3');
 dump('invoke handle');
 $rawbody = $this->job->getRawBody();
 $info = json_decode($rawbody, true);
 info('queue id:' . $info['id']);
}

Laravel 使用 Redis 的 list 作为队列的数据结构,并会为每个队列分配一个 ID,数据结构如下:

{
 "job": "Illuminate\\Queue\\CallQueuedHandler@call",
 "data": {
 "commandName": "App\\Jobs\\SendFile3",
 "command": "O:18:\"App\\Jobs\\SendFile3\":4:{s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";N;s:5:\"queue\";s:8:\"sendfile\";s:5:\"delay\";N;}"
 },
 "id": "hadBcy3IpNsnOofQQdHohsa451OkQs88",
 "attempts": 1
}

请求这个控制器路由(或者命令行方式),就可以看到 Redis 中多了很多队列任务了,如图:

这个时候开启 Supervisor 处理队列任务,并查看日志:

[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:JaClJzhDEvntzLCRIz6uRQkCVLbE8Y9C
[2017-12-23 19:01:01] local.INFO: queue id:ukHv0Li4P2VgPa55qU6yEOJM27Mo5YwJ
[2017-12-23 19:01:01] local.INFO: queue id:ObMpwDTmnaveBUkU7aan5abt3Agyt90l
[2017-12-23 19:01:01] local.INFO: queue id:fo2qZn2ftSdQtdnKOciMK7iJb4qlhRGE
[2017-12-23 19:01:01] local.INFO: queue id:uLjFMoOU7Wk7bOAd4zpHb3ccRMJHBtR6
[2017-12-23 19:01:01] local.INFO: queue id:87ULqPBObFmGr16nl5wxFVOi71zGCeRM
[2017-12-23 19:01:01] local.INFO: queue id:9UVl0muQLzBqlRI99rChGW2ElXwVEMIE
[2017-12-23 19:01:01] local.INFO: queue id:a0vgyZuz9HtmH7DGHEpXqesFTcQU3QAF
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:2cXuXxopPkgYiV4WO8gv9CJ6CwXeKtYL
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:9acTAYa8cxpJX6Q3Gb1sULokotP8reqZ
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:BPHQvBboChlv4gr2I0vyLVyw9bijtTYJ
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:Fm6tNajdxYKtdQbDMYDmwWJFLnNikRyg
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:nyAbcvSkBVPbaH3e2ItQkoLJlP1ficib
[2017-12-23 19:01:01] local.INFO: queue id:WBHsSVZtP43569UoPXxfLLJcvYmPW7cP
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:bliPnKcRSDApwVmKLNxEhaKelhm0RDEY
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:eOAoQucEIwRz9uZ64xm6IDKgiqj9Xc3W
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:lzise9EiqQqINrhALbmAI4qNg7qylpb2
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:WXYKvcfOhS1pPnwOwUTsenoMv5l5EUXe
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:XtH5JiwLgnrwWzI02Oyi70pihAOkuJUD
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:9ehmE5HImlpNubpY0xWN8UVrOzxeMqws
[2017-12-23 19:01:01] local.INFO: queue id:C1sK87cpZl47edLA0zhfo7PJ9MIEcoyx
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:2kwl51oH4lyyRrljCReGUCkNiJRDl7oe
[2017-12-23 19:01:01] local.INFO: queue id:ObRpoqrYTPYiyv2delMlOXu3sAPpWJlN
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:6qgu6W3TapLjSrt688yv9HRXvDDLxntz
[2017-12-23 19:01:01] local.INFO: queue id:wiTlERhwn7s9cQkfUF9lLlNADpXjKncI
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:ZSLW0VLFBDpL4wjTJzu3Yb3V45pNe807
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:qhZlXLGfGWRluIeNm7VbllmTJZYb2h5n
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:LUx1IByD3L2psNl9BZwHhk2knXyRPzW6
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:M2RESPjyo5hpAFxxL0EQbWwsUq4jpmWn
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:hUsGaiIAOO6ZfGQc5kGHGpsv5RpoRPYO
[2017-12-23 19:01:01] local.INFO: queue id:cEHJsOy6bLeZ4NbncPziaHqlarMeyyEF
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:w4bkFiJKMU5saqG2xKN3ZRL5BYXGATMk
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:0zBuwbxlrEhhxKfYBkVyTY4z35f154sI
[2017-12-23 19:01:01] local.INFO: queue id:mvoZvyDPvq4tcPjEy9G7PMtH3MwPkPik
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:TLvF74eeidECWKtjZqWvW03UJTRPTL9r
[2017-12-23 19:01:01] local.INFO: queue id:me8wyPfgcz0nf9xvcXz0hf2xVxqa1FFS

这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 ID. 我们再看一下 Laravel 如何使用 Redis 处理队列的。

分析一下 Laravel 队列的处理

Laravel 中入队列方法

public function pushRaw($payload, $queue = null, array $options = [])
{
 $this->getConnection()->rpush($this->getQueue($queue), $payload);
 
 return Arr::get(json_decode($payload, true), 'id');
}

用的是 Redis 的 rpush 命令。

Laravel 中取队列方法

public function pop($queue = null)
{
 $original = $queue ?: $this->default; 
 $queue = $this->getQueue($queue); 
 $this->migrateExpiredJobs($queue.':delayed', $queue); 
 if (! is_null($this->expire)) {
  $this->migrateExpiredJobs($queue.':reserved', $queue);
 } 
 list($job, $reserved) = $this->getConnection()->eval(
  LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
 ); 
 if ($reserved) {
  return new RedisJob($this->container, $this, $job, $reserved, $original);
 }
}

这里用的是 lua 脚本取队列,如下:

public static function pop()
{
 return <<<&#39;LUA&#39;
local job = redis.call(&#39;lpop&#39;, KEYS[1])
local reserved = false
if(job ~= false) then
reserved = cjson.decode(job)
reserved[&#39;attempts&#39;] = reserved[&#39;attempts&#39;] + 1
reserved = cjson.encode(reserved)
redis.call(&#39;zadd&#39;, KEYS[2], ARGV[1], reserved)
end
return {job, reserved}
LUA;
}

那么结论是:从 Laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。

总结

您可能感兴趣的文章:

php-msf源码的详解

thinkphp5 URL和路由的功能详解与实例讲解

PHP基于双向链表与排序操作实现的会员排名功能示例_php技巧

以上是关于 Laravel Redis 多个进程同时取队列的问题详解的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
如何检查PHP会话是否已经开始?如何检查PHP会话是否已经开始?Apr 30, 2025 am 12:20 AM

在PHP中,可以使用session_status()或session_id()来检查会话是否已启动。1)使用session_status()函数,如果返回PHP_SESSION_ACTIVE,则会话已启动。2)使用session_id()函数,如果返回非空字符串,则会话已启动。这两种方法都能有效地检查会话状态,选择使用哪种方法取决于PHP版本和个人偏好。

描述一个场景,其中使用会话在Web应用程序中至关重要。描述一个场景,其中使用会话在Web应用程序中至关重要。Apr 30, 2025 am 12:16 AM

sessionsarevitalinwebapplications,尤其是在commercePlatform之前。

如何管理PHP中的并发会话访问?如何管理PHP中的并发会话访问?Apr 30, 2025 am 12:11 AM

在PHP中管理并发会话访问可以通过以下方法:1.使用数据库存储会话数据,2.采用Redis或Memcached,3.实施会话锁定策略。这些方法有助于确保数据一致性和提高并发性能。

使用PHP会话的局限性是什么?使用PHP会话的局限性是什么?Apr 30, 2025 am 12:04 AM

PHPsessionshaveseverallimitations:1)Storageconstraintscanleadtoperformanceissues;2)Securityvulnerabilitieslikesessionfixationattacksexist;3)Scalabilityischallengingduetoserver-specificstorage;4)Sessionexpirationmanagementcanbeproblematic;5)Datapersis

解释负载平衡如何影响会话管理以及如何解决。解释负载平衡如何影响会话管理以及如何解决。Apr 29, 2025 am 12:42 AM

负载均衡会影响会话管理,但可以通过会话复制、会话粘性和集中式会话存储解决。1.会话复制在服务器间复制会话数据。2.会话粘性将用户请求定向到同一服务器。3.集中式会话存储使用独立服务器如Redis存储会话数据,确保数据共享。

说明会话锁定的概念。说明会话锁定的概念。Apr 29, 2025 am 12:39 AM

Sessionlockingisatechniqueusedtoensureauser'ssessionremainsexclusivetooneuseratatime.Itiscrucialforpreventingdatacorruptionandsecuritybreachesinmulti-userapplications.Sessionlockingisimplementedusingserver-sidelockingmechanisms,suchasReentrantLockinJ

有其他PHP会议的选择吗?有其他PHP会议的选择吗?Apr 29, 2025 am 12:36 AM

PHP会话的替代方案包括Cookies、Token-basedAuthentication、Database-basedSessions和Redis/Memcached。1.Cookies通过在客户端存储数据来管理会话,简单但安全性低。2.Token-basedAuthentication使用令牌验证用户,安全性高但需额外逻辑。3.Database-basedSessions将数据存储在数据库中,扩展性好但可能影响性能。4.Redis/Memcached使用分布式缓存提高性能和扩展性,但需额外配

在PHP的上下文中定义'会话劫持”一词。在PHP的上下文中定义'会话劫持”一词。Apr 29, 2025 am 12:33 AM

Sessionhijacking是指攻击者通过获取用户的sessionID来冒充用户。防范方法包括:1)使用HTTPS加密通信;2)验证sessionID的来源;3)使用安全的sessionID生成算法;4)定期更新sessionID。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

WebStorm Mac版

WebStorm Mac版

好用的JavaScript开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )专业的PHP集成开发工具

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

将Eclipse与SAP NetWeaver应用服务器集成。