Home >PHP Framework >Laravel >What should I do if the extension of laravel-swoole in laravel8 is not compatible with the message queue?
The following is an introduction to laravel-swoole message queue from the laraveltutorial column, I hope it will be helpful to friends in need!
During this period of time, I used laravel8 laravel-swoole to do projects, and found that the extension of laravel-swoole is not compatible with the message queue;
After thinking about it, what should I do? What should I do? Just write it yourself! Fortunately, the thinkphp-swoole extension is already compatible, so hey!
Directly upload the modified ideas and code! Open Do it!
One is to add another startup command or start the message queue for consumption when swoole is started. A lazy person like me can solve it with one command and will never write two commands.
First rewrite the swoole startup command
<?php namespace crmeb\swoole\command; use Illuminate\Support\Arr; use Swoole\Process; use SwooleTW\Http\Server\Facades\Server; use SwooleTW\Http\Server\Manager; use crmeb\swoole\server\InteractsWithQueue; use crmeb\swoole\server\FileWatcher; use Swoole\Runtime; class HttpServerCommand extends \SwooleTW\Http\Commands\HttpServerCommand { use InteractsWithQueue; /** * The name and signature of the console command. * * @var string */ protected $signature = 'crmeb:http {action : start|stop|restart|reload|infos}'; /** * Run swoole_http_server. */ protected function start() { if ($this->isRunning()) { $this->error('Failed! swoole_http_server process is already running.'); return; } $host = Arr::get($this->config, 'server.host'); $port = Arr::get($this->config, 'server.port'); $hotReloadEnabled = Arr::get($this->config, 'hot_reload.enabled'); $queueEnabled = Arr::get($this->config, 'queue.enabled'); $accessLogEnabled = Arr::get($this->config, 'server.access_log'); $coroutineEnable = Arr::get($this->config, 'coroutine.enable'); $this->info('Starting swoole http server...'); $this->info("Swoole http server started: <http://{$host}:{$port}>"); if ($this->isDaemon()) { $this->info( '> (You can run this command to ensure the ' . 'swoole_http_server process is running: ps aux|grep "swoole")' ); } $manager = $this->laravel->make(Manager::class); $server = $this->laravel->make(Server::class); if ($accessLogEnabled) { $this->registerAccessLog(); } //热更新重写 if ($hotReloadEnabled) { $manager->addProcess($this->getHotReloadProcessNow($server)); } //启动消息队列进行消费 if ($queueEnabled) { $this->prepareQueue($manager); } if ($coroutineEnable) { Runtime::enableCoroutine(true, Arr::get($this->config, 'coroutine.flags', SWOOLE_HOOK_ALL)); } $manager->run(); } /** * @param Server $server * @return Process|void */ protected function getHotReloadProcessNow($server) { return new Process(function () use ($server) { $watcher = new FileWatcher( Arr::get($this->config, 'hot_reload.include', []), Arr::get($this->config, 'hot_reload.exclude', []), Arr::get($this->config, 'hot_reload.name', []) ); $watcher->watch(function () use ($server) { $server->reload(); }); }, false, 0, true); } }
InteractsWithQueue class
<?php namespace crmeb\swoole\server; use crmeb\swoole\queue\Manager as QueueManager; use SwooleTW\Http\Server\Manager; /** * Trait InteractsWithQueue * @package crmeb\swoole\server */ trait InteractsWithQueue { public function prepareQueue(Manager $manager) { /** @var QueueManager $queueManager */ $queueManager = $this->laravel->make(QueueManager::class); $queueManager->attachToServer($manager, $this->output); } }
Manager class
<?php namespace crmeb\swoole\queue; use Illuminate\Contracts\Container\Container; use Swoole\Constant; use Swoole\Process; use Swoole\Process\Pool; use Swoole\Timer; use Illuminate\Support\Arr; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Worker; use crmeb\swoole\server\WithContainer; use Illuminate\Queue\Jobs\Job; use function Swoole\Coroutine\run; use Illuminate\Queue\WorkerOptions; use SwooleTW\Http\Server\Manager as ServerManager; use Illuminate\Console\OutputStyle; class Manager { use WithContainer; /** * Container. * * @var \Illuminate\Contracts\Container\Container */ protected $container; /** * @var OutputStyle */ protected $output; /** * @var Closure[] */ protected $workers = []; /** * Manager constructor. * @param Container $container */ public function __construct(Container $container) { $this->container = $container; } /** * @param ServerManager $server */ public function attachToServer(ServerManager $server, OutputStyle $output) { $this->output = $output; $this->listenForEvents(); $this->createWorkers(); foreach ($this->workers as $worker) { $server->addProcess(new Process($worker, false, 0, true)); } } /** * 运行消息队列命令 */ public function run(): void { @cli_set_process_title("swoole queue: manager process"); $this->listenForEvents(); $this->createWorkers(); $pool = new Pool(count($this->workers)); $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) { $process = $pool->getProcess($workerId); run($this->workers[$workerId], $process); }); $pool->start(); } /** * 创建执行任务 */ protected function createWorkers() { $workers = $this->getConfig('queue.workers', []); foreach ($workers as $queue => $options) { if (strpos($queue, '@') !== false) { [$queue, $connection] = explode('@', $queue); } else { $connection = null; } $this->workers[] = function (Process $process) use ($options, $connection, $queue) { @cli_set_process_title("swoole queue: worker process"); /** @var Worker $worker */ $worker = $this->container->make('queue.worker'); /** @var WorkerOptions $option */ $option = $this->container->make(WorkerOptions::class); $option->sleep = Arr::get($options, "sleep", 3); $option->maxTries = Arr::get($options, "tries", 0); $option->timeout = Arr::get($options, "timeout", 60); $timer = Timer::after($option->timeout * 1000, function () use ($process) { $process->exit(); }); $worker->runNextJob($connection, $queue, $option); Timer::clear($timer); }; } } /** * 注册事件 */ protected function listenForEvents() { $this->container->make('events')->listen(JobFailed::class, function (JobFailed $event) { $this->writeOutput($event->job); $this->logFailedJob($event); }); } /** * 记录失败任务 * @param JobFailed $event */ protected function logFailedJob(JobFailed $event) { $this->container['queue.failer']->log( $event->connection, $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); } /** * Write the status output for the queue worker. * * @param Job $job * @param $status */ protected function writeOutput(Job $job, $status) { switch ($status) { case 'starting': $this->writeStatus($job, 'Processing', 'comment'); break; case 'success': $this->writeStatus($job, 'Processed', 'info'); break; case 'failed': $this->writeStatus($job, 'Failed', 'error'); break; } } /** * Format the status output for the queue worker. * * @param Job $job * @param string $status * @param string $type * @return void */ protected function writeStatus(Job $job, $status, $type) { $this->output->writeln(sprintf( "<{$type}>[%s][%s] %s</{$type}> %s", date('Y-m-d H:i:s'), $job->getJobId(), str_pad("{$status}:", 11), $job->getName() )); } }
Add CrmebServiceProvider class
<?php namespace crmeb\swoole; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Contracts\Http\Kernel; use crmeb\swoole\command\HttpServerCommand; use Illuminate\Queue\Worker; use SwooleTW\Http\HttpServiceProvider; use SwooleTW\Http\Middleware\AccessLog; use SwooleTW\Http\Server\Manager; /** * Class CrmebServiceProvider * @package crmeb\swoole */ class CrmebServiceProvider extends HttpServiceProvider { /** * Register manager. * * @return void */ protected function registerManager() { $this->app->singleton(Manager::class, function ($app) { return new Manager($app, 'laravel'); }); $this->app->alias(Manager::class, 'swoole.manager'); $this->app->singleton('queue.worker', function ($app) { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); }; return new Worker( $app['queue'], $app['events'], $app[ExceptionHandler::class], $isDownForMaintenance ); }); } /** * Boot websocket routes. * * @return void */ protected function bootWebsocketRoutes() { require base_path('vendor/swooletw/laravel-swoole') . '/routes/laravel_routes.php'; } /** * Register access log middleware to container. * * @return void */ protected function pushAccessLogMiddleware() { $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class); } /** * Register commands. */ protected function registerCommands() { $this->commands([ HttpServerCommand::class, ]); } /** * Merge configurations. */ protected function mergeConfigs() { $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php', 'swoole_http'); $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php', 'swoole_websocket'); } /** * Publish files of this package. */ protected function publishFiles() { $this->publishes([ base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php' => base_path('config/swoole_http.php'), base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php' => base_path('config/swoole_websocket.php'), base_path('vendor/swooletw/laravel-swoole') . '/routes/websocket.php' => base_path('routes/websocket.php'), ], 'laravel-swoole'); } }
Then add \ crmeb\swoole\CrmebServiceProvider::class
Put it into providers
in config/app.php
to load and rewrite the swoole command startup method
configuration config/swoole_http.php
return [ 'queue' => [ //是否开启自动消费队列 'enabled' => true, 'workers' => [ //队列名称 'CRMEB' => [] ] ],];
Enter the command:php artisan crmeb:http restart
After swoole is started, you can automatically consume the queue .
Related recommendations: The latest five Laravel video tutorials
The above is the detailed content of What should I do if the extension of laravel-swoole in laravel8 is not compatible with the message queue?. For more information, please follow other related articles on the PHP Chinese website!