이 글은 주로 참조할만한 가치가 있는 Laravel의 대기열 시스템 소개를 소개합니다. 이제는 모든 사람과 공유합니다. 도움이 필요한 친구들이 참고할 수 있습니다.
Laravel 대기열은 Beanstalk와 같은 다양한 백그라운드 대기열 서비스에 대한 통합 API를 제공합니다. Amazon SQS, Redis 및 기타 관계형 데이터베이스 기반 대기열. 대기열의 목적은 이메일 전송과 같이 시간이 많이 걸리는 작업의 처리를 지연시켜 웹 요청 및 응답 시간을 크게 단축하는 것입니다.
큐 구성 파일은 config/queue.php
에 저장됩니다. 데이터베이스, Beanstalkd, Amazon SQS, Redis 및 동기(로컬 사용) 드라이버를 포함하여 각 대기열 드라이버에 대한 구성을 이 파일에서 찾을 수 있습니다. 또한 대기열을 포기하는 작업을 위한 null
대기열 드라이버도 포함되어 있습니다. config/queue.php
。 每一种队列驱动的配置都可以在该文件中找到, 包括数据库, Beanstalkd, Amazon SQS, Redis, 以及同步(本地使用)驱动。 其中还包含了一个null
队列驱动用于那些放弃队列的任务。
在开始使用 Laravel 队列前,弄明白 「连接」 和 「队列」 的区别是很重要的。在你的 config/queue.php
配置文件里, 有一个 connections
配置选项。 这个选项给 Amazon SQS, Beanstalk ,或者 Redis 这样的后端服务定义了一个特有的连接。不管是哪一种,一个给定的连接可能会有多个「队列」,而 「队列」可以被认为是不同的栈或者大量的队列任务。
要注意的是, queue
配置文件中每个连接的配置示例中都包含一个 queue
属性。这是默认队列,任务被发给指定连接的时候会被分发到这个队列中。换句话说,如果你分发任务的时候没有显式定义队列,那么它就会被放到连接配置中 queue
属性所定义的队列中:
// 这个任务将被分发到默认队列...dispatch(new Job);// 这个任务将被发送到「emails」队列...dispatch((new Job)->onQueue('emails'));
有些应用可能不需要把任务发到不同的队列,而只发到一个简单的队列中就行了。但是把任务推到不同的队列仍然是非常有用的,因为 Laravel 队列处理器允许你定义队列的优先级,所以你能给不同的队列划分不同的优先级或者区分不同任务的不同处理方式了。比如说,如果你把任务推到 high
队列中,你就能让队列处理器优先处理这些任务了:
php artisan queue:work --queue=high,default
要使用 database
这个队列驱动的话, 你需要创建一个数据表来存储任务,你可以用 queue:table
这个 Artisan 命令来创建这个数据表的迁移。 当迁移创建好以后,就可以用 migrate
这条命令来创建数据表:
php artisan queue:table php artisan migrate
为了使用 redis
队列驱动, 你需要在你的配置文件 config/database.php
中配置Redis的数据库连接
如果你的 Redis 队列连接使用的是 Redis 集群, 你的队列名称必须包含 key hash tag 。 这是为了确保所有的 redis 键对于一个给定的队列都置于同一哈希中:
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90,],
在使用列表里的队列服务前,必须安装以下依赖扩展包:
Amazon SQS: aws/aws-sdk-php ~3.0
Beanstalkd: pda/pheanstalk ~3.0
Redis: predis/predis ~1.0
在你的应用程序中,队列的任务类都默认放在 app/Jobs
目录下,如果这个目录不存在,那当你运行 make:job
artisan 命令时目录就会被自动创建。 你可以用以下的 Artisan 命令来生成一个新的队列任务:
php artisan make:job SendReminderEmail
生成的类实现了 IlluminateContractsQueueShouldQueue
接口,这意味着这个任务将会被推送到队列中,而不是同步执行。
任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle
方法。我们来看一个示例的任务类,这个示例里,假设我们管理着一个播客发布服务,在发布之前需要处理上传播客文件:
<?phpnamespace App\Jobs;use App\Podcast;use App\AudioProcessor;use Illuminate\Bus\Queueable;use Illuminate\Queue\SerializesModels;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Contracts\Queue\ShouldQueue;class ProcessPodcast implements ShouldQueue{ use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 创建一个新的任务实例。 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 运行任务。 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... }}
注意,在这个例子中,我们在任务类的构造器中直接传递了一个 Eloquent 模型。因为我们在任务类里引用了 SerializesModels
config/queue.php
구성 파일에는 connections
구성 옵션이 있습니다. 이 옵션은 Amazon SQS, Beanstalk 또는 Redis와 같은 백엔드 서비스에 대한 고유한 연결을 정의합니다. 어느 쪽이든 특정 연결에 대해 여러 개의 "대기열"이 있을 수 있으며 "대기열"은 서로 다른 스택 또는 대기열에 있는 많은 수의 작업으로 간주될 수 있습니다. 🎜🎜 queue
구성 파일의 각 연결 구성 예에는 queue
속성이 포함되어 있다는 점에 유의하세요. 이는 기본 대기열이며 작업이 지정된 연결로 전송될 때 이 대기열로 배포됩니다. 즉, 작업을 배포할 때 대기열을 명시적으로 정의하지 않으면 연결 구성의 queue
속성으로 정의된 대기열에 배치됩니다. 🎜<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... dispatch(new ProcessPodcast($podcast)); }}🎜일부 애플리케이션에서는 그럴 필요가 없을 수도 있습니다. 작업은 다른 대기열로 전송되지만 단순 대기열로만 전송됩니다. 그러나 작업을 다른 큐에 푸시하는 것은 여전히 매우 유용합니다. Laravel 큐 핸들러를 사용하면 큐의 우선순위를 정의할 수 있으므로 다른 큐에 다른 우선순위를 할당하거나 다른 작업에 대해 다른 처리 방법을 구별할 수 있습니다. 예를 들어 작업을
high
대기열에 푸시하면 대기열 프로세서가 다음 작업의 우선순위를 지정하도록 할 수 있습니다. 🎜<?phpnamespace App\Http\Controllers;use Carbon\Carbon;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存一个新的播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... $job = (new ProcessPodcast($podcast)) ->delay(Carbon::now()->addMinutes(10)); dispatch($job); }}🎜🎜🎜
database
큐 드라이버를 사용하려면 작업을 저장할 데이터 테이블을 생성해야 합니다. queue:table
Artisan 명령을 사용하여 이 마이그레이션을 생성할 수 있습니다. 데이터 테이블. 마이그레이션이 생성된 후 migration
명령을 사용하여 데이터 테이블을 생성할 수 있습니다. 🎜<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存一个新的播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... $job = (new ProcessPodcast($podcast))->onQueue('processing'); dispatch($job); }}
redis
대기열을 사용하려면 드라이버를 사용하려면 구성 파일 config/database.php
🎜🎜에서 Redis 데이터베이스 연결을 구성해야 합니다. Redis 대기열 연결이 Redis 클러스터를 사용하는 경우 대기열 이름에 키 해시 태그가 포함되어야 합니다. 이는 특정 대기열에 대한 모든 Redis 키가 동일한 해시에 배치되도록 하기 위한 것입니다: 🎜<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存一个新的播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... $job = (new ProcessPodcast($podcast))->onConnection('sqs'); dispatch($job); }}
🎜🎜
aws/aws-sdk-php ~3.0 🎜
pda/pheanstalk ~3.0
🎜predis/predis ~1.0
🎜 app/Jobs
디렉터리가 없으면 make:job
artisan 명령을 실행할 때 디렉터리가 자동으로 생성됩니다. 다음 Artisan 명령어를 사용하여 새 대기열 작업을 생성할 수 있습니다: 🎜$job = (new ProcessPodcast($podcast)) ->onConnection('sqs') ->onQueue('processing');🎜생성된 클래스는
IlluminateContractsQueueShouldQueue
인터페이스를 구현합니다. 이는 작업이 동기적으로 실행되기보다는 대기열에 푸시된다는 의미입니다. 🎜🎜🎜🎜handle
메서드만 포함합니다. 예제 작업 클래스를 살펴보겠습니다. 이 예제에서는 팟캐스트 게시 서비스를 관리하고 게시하기 전에 업로드된 팟캐스트 파일을 처리해야 한다고 가정합니다. 🎜php artisan queue:work --tries=3🎜 이 예제에서는 작업 클래스의 생성자를 An에 직접 사용합니다. Eloquent 모델이 통과되었습니다. 작업 클래스에서
SerializesModels
를 참조했기 때문에 Eloquent 모델은 작업을 처리할 때 정상적으로 직렬화 및 역직렬화될 수 있습니다. 대기열 작업 클래스가 생성자에서 Eloquent 모델을 받으면 해당 모델을 식별하는 속성만 대기열에 직렬화됩니다. 작업이 실제로 실행되면 대기열 시스템은 데이터베이스에서 전체 모델을 자동으로 검색합니다. 이 전체 프로세스는 애플리케이션에 완전히 투명하므로 전체 Eloquent 패턴 인스턴스 직렬화와 관련된 일부 문제를 피할 수 있습니다. 🎜在队列处理任务时,会调用 handle
方法,而这里我们也可以通过 handle 方法的参数类型提示,让 Laravel 的 服务容器 自动注入依赖对象。
{note} 像图片内容这种二进制数据, 在放入队列任务之前必须使用
base64_encode
方法转换一下。 否则,当这项任务放置到队列中时,可能无法正确序列化为 JSON。
你写好任务类后,就能通过 dispatch
辅助函数来分发它了。唯一需要传递给 dispatch
的参数是这个任务类的实例:
<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... dispatch(new ProcessPodcast($podcast)); }}
{tip}
dispatch
提供了一种简捷、全局可用的函数,它也非常容易测试。查看下 Laravel 测试文档 来了解更多。
如果你想延迟执行一个队列中的任务,你可以用任务实例的 delay
方法。 这个方法是 Illuminate\Bus\Queueable
trait 提供的,而这个 trait 在所有自动生成的任务类中都是默认加载了的。对于延迟任务我们可以举个例子,比如指定一个被分发10分钟后才执行的任务:
<?phpnamespace App\Http\Controllers;use Carbon\Carbon;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存一个新的播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... $job = (new ProcessPodcast($podcast)) ->delay(Carbon::now()->addMinutes(10)); dispatch($job); }}
{note} Amazon SQS 队列服务最大延迟 15 分钟。
通过推送任务到不同的队列,你可以给队列任务分类,甚至可以控制给不同的队列分配多少任务。记住,这个并不是要推送任务到队列配置文件中不同的 「connections」 里,而是推送到一个连接中不同的队列里。要指定队列的话,就调用任务实例的 onQueue
方法:
<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存一个新的播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... $job = (new ProcessPodcast($podcast))->onQueue('processing'); dispatch($job); }}
如果你使用了多个队列连接,你可以把任务推到指定连接。要指定连接的话,你可以调用任务实例的 onConnection
方法:
<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 保存一个新的播客。 * * @param Request $request * @return Response */ public function store(Request $request) { // 创建播客... $job = (new ProcessPodcast($podcast))->onConnection('sqs'); dispatch($job); }}
当然,你可以链式调用 onConnection
和 onQueue
来同时指定任务的连接和队列:
$job = (new ProcessPodcast($podcast)) ->onConnection('sqs') ->onQueue('processing');
在一项任务中指定最大的尝试次数可以尝试通过 Artisan 命令行 --tries
来设置:
php artisan queue:work --tries=3
但是,你可以采取更为精致的方法来完成这项工作比如说在任务类中定义最大尝试次数。如果在类和命令行中都定义了最大尝试次数, Laravel 会优先执行任务类中的值:
<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任务最大尝试次数 * * @var int */ public $tries = 5;}
同样的,任务可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout
开关指定:
php artisan queue:work --timeout=30
然而,你也可以在任务类中定义一个变量来设置可运行的最大描述,如果在类和命令行中都定义了最大尝试次数, Laravel 会优先执行任务类中的值:
<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任务运行的超时时间。 * * @var int */ public $timeout = 120;}
如果任务运行的时候抛出异常,这个任务就自动被释放回队列,这样它就能被再重新运行了。如果继续抛出异常,这个任务会继续被释放回队列,直到重试次数达到你应用允许的最多次数。这个最多次数是在调用 queue:work
Artisan 命令的时候通过 --tries
参数来定义的。更多队列处理器的信息可以 在下面看到 。
Laravel 包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过 queue:work Artisan 命令来运行处理器。要注意,一旦 queue:work
命令开始,它将一直运行,直到你手动停止或者你关闭控制台:
php artisan queue:work
{tip} 要让
queue:work
进程永久在后台运行,你应该使用进程监控工具,比如Supervisor
来保证队列处理器没有停止运行。
一定要记得,队列处理器是长时间运行的进程,并在内存里保存着已经启动的应用状态。这样的结果就是,处理器运行后如果你修改代码那这些改变是不会应用到处理器中的。所以在你重新部署过程中,一定要 重启队列处理器 。
你可以指定队列处理器所使用的连接。你在 config/queue.php
配置文件里定义了多个连接,而你传递给 work
命令的连接名字要至少跟它们其中一个是一致的:
php artisan queue:work redis
你可以自定义队列处理器,方式是处理给定连接的特定队列。举例来说,如果你所有的邮件都是在 redis
连接中的 emails
队列中处理的,你就能通过以下命令启动一个只处理那个特定队列的队列处理器了:
php artisan queue:work redis --queue=emails
守护程序队列不会在处理每个作业之前 「重新启动」 框架。 因此,在每个任务完成后,您应该释放任何占用过大的资源。例如,如果你使用GD库进行图像处理,你应该在完成后用 imagedestroy
释放内存。
有时候你希望设置处理队列的优先级。比如在 config/queue.php
里你可能设置了 redis
连接中的默认队列优先级为 low
,但是你可能偶尔希望把一个任务推到 high
优先级的队列中,像这样:
dispatch((new Job)->onQueue('high'));
要验证 high
队列中的任务都是在 low
队列中的任务之前处理的,你要启动一个队列处理器,传递给它队列名字的列表并以英文逗号,间隔:
php artisan queue:work --queue=high,low
因为队列处理器都是 long-lived 进程,如果代码改变而队列处理器没有重启,他们是不能应用新代码的。所以最简单的方式就是重新部署过程中要重启队列处理器。你可以很优雅地只输入 queue:restart
来重启所有队列处理器。
php artisan queue:restart
这个命令将会告诉所有队列处理器在执行完当前任务后结束进程,这样才不会有任务丢失。因为队列处理器在执行 queue:restart
命令时对结束进程,你应该运行一个进程管理器,比如 Supervisor 来自动重新启动队列处理器。
config/queue.php
配置文件里,每一个队列连接都定义了一个 retry_after
选项。这个选项指定了任务最多处理多少秒后就被当做失败重试了。比如说,如果这个选项设置为 90
,那么当这个任务持续执行了 90
秒而没有被删除,那么它将被释放回队列。通常情况下,你应该把 retry_after
设置为最长耗时的任务所对应的时间。
{note} 唯一没有
retry_after
选项的连接是 Amazon SQS。当用 Amazon SQS 时,你必须通过 Amazon 命令行来配置这个重试阈值。
queue:work
Artisan 命令对外有一个 --timeout
选项。这个选项指定了 Laravel
队列处理器最多执行多长时间后就应该被关闭掉。有时候一个队列的子进程会因为很多原因僵死,比如一个外部的 HTTP 请求没有响应。这个 --timeout
选项会移除超出指定事件限制的僵死进程。
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
命令行选项是不一样的,但是可以同时工作来保证任务不会丢失并且不会重复执行。
{note}
--timeout
应该永远都要比retry_after
短至少几秒钟的时间。这样就能保证任务进程总能在失败重试前就被杀死了。如果你的--timeout
选项大于retry_after
配置选项,你的任务可能被执行两次。
当队列需要处理任务时,进程将继续处理任务,它们之间没有延迟。 但是,如果没有新的工作可用,sleep
参数决定了工作进程将「睡眠」多长时间:
php artisan queue:work --sleep=3
Supervisor 是一个 Linux 操作系统上的进程监控软件,它会在 queue:listen
或 queue:work
命令发生失败后自动重启它们。要在 Ubuntu 安装 Supervisor,可以用以下命令:
sudo apt-get install supervisor
{tip} 如果自己手动配置 Supervisor 听起来有点难以应付,可以考虑使用 Laravel Forge ,它能给你的 Laravel 项目自动安装与配置 Supervisor。
Supervisor 的配置文件一般是放在 /etc/supervisor/conf.d
目录下,在这个目录中你可以创建任意数量的配置文件来要求 Supervisor 怎样监控你的进程。例如我们创建一个 laravel-worker.conf
来启动与监控一个 queue:work
进程:
[program:laravel-worker]process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3autostart=trueautorestart=trueuser=forge numprocs=8redirect_stderr=truestdout_logfile=/home/forge/app.com/worker.log
这个例子里的 numprocs
命令会要求 Supervisor 运行并监控 8 个 queue:work
进程,并且在它们运行失败后重新启动。当然,你必须更改 command
命令的 queue:work sqs
,以显示你所选择的队列驱动。
当这个配置文件被创建后,你需要更新 Supervisor 的配置,并用以下命令来启动该进程:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:*
更多有关 Supervisor 的设置与使用,请参考 Supervisor 官方文档。
有时候你队列中的任务会失败。不要担心,本来事情就不会一帆风顺。 Laravel 内置了一个方便的方式来指定任务重试的最大次数。当任务超出这个重试次数后,它就会被插入到 failed_jobs
数据表里面。要创建 failed_jobs
表的话,你可以用 queue:failed-table
命令:
php artisan queue:failed-table php artisan migrate
然后运行队列处理器,在调用 queue:work 命令时你应该通过 --tries
参数指定任务的最大重试次数。如果不指定,任务就会永久重试:
php artisan queue:work redis --tries=3
你可以在任务类里直接定义 failed
方法,它能在任务失败时运行任务的清除逻辑。这个地方用来发一条警告给用户或者重置任务执行的操作等再好不过了。导致任务失败的异常信息会被传递到 failed
方法:
<?phpnamespace App\Jobs;use Exception;use App\Podcast;use App\AudioProcessor;use Illuminate\Bus\Queueable;use Illuminate\Queue\SerializesModels;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Contracts\Queue\ShouldQueue;class ProcessPodcast implements ShouldQueue{ use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 创建一个新的任务实例。 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 执行任务。 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // 处理上传播客... } /** * 要处理的失败任务。 * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 给用户发送失败通知,等等... }}
任务失败事件
如果你想注册一个当队列任务失败时会被调用的事件,则可以用 Queue::failing
方法。这样你就有机会通过这个事件来用 e-mail 或 HipChat 通知你的团队。例如我们可以在 Laravel 内置的 AppServiceProvider
中对这个事件附加一个回调函数:
<?phpnamespace App\Providers;use Illuminate\Support\Facades\Queue;use Illuminate\Queue\Events\JobFailed;use Illuminate\Support\ServiceProvider;class AppServiceProvider extends ServiceProvider{ /** * 启动任意应用程序的服务。 * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); } /** * 注册服务提供者。 * * @return void */ public function register() { // }}
要查看你在 failed_jobs
数据表中的所有失败任务,则可以用 queue:failed
这个 Artisan 命令:
php artisan queue:failed
queue:failed
命令会列出所有任务的 ID、连接、队列以及失败时间,任务 ID 可以被用在重试失败的任务上。例如要重试一个 ID 为 5
的失败任务,其命令如下:
php artisan queue:retry 5
要重试所有失败的任务,可以使用 queue:retry
并使用 all
作为 ID:
php artisan queue:retry all
如果你想删除掉一个失败任务,可以用 queue:forget
命令:
php artisan queue:forget 5
queue:flush
命令可以让你删除所有失败的任务:
php artisan queue:flush
使用队列的 before
和 after
方法,你能指定任务处理前和处理后的回调处理。在这些回调里正是实现额外的日志记录或者增加统计数据的好时机。通常情况下,你应该在 服务容器 中调用这些方法。例如,我们使用 Laravel 中的 AppServiceProvider:
<?phpnamespace App\Providers;use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobProcessed;use Illuminate\Queue\Events\JobProcessing;class AppServiceProvider extends ServiceProvider{ /** * 启动任意服务。 * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * 注册服务提供者。 * * @return void */ public function register() { // }}
在 队列
facade 中使用 looping
方法,你可以尝试在队列获取任务之前执行指定的回调方法。举个例子,你可以用闭包来回滚之前已失败任务的事务。
Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); }});
相关推荐:
위 내용은 Laravel의 대기열 시스템 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!