検索
ホームページバックエンド開発PHPチュートリアルSwooleとRedisをベースにした同時キュー処理システム

バックグラウンド

PHP はマルチスレッドをサポートしていませんが、完全なシステムとしては、非同期で完了する必要がある操作が多数あります。これらの非同期操作を完了するために、Redis キュー タスク システムを構築しました。

ご存知のとおり、メッセージ キュー処理システムは主にコンシューマーとプロデューサーの 2 つの部分に分かれています。

私たちのシステムでは、メインシステムがプロデューサーとして機能し、タスクシステムがコンシューマーとして機能します。

具体的なワークフローは次のとおりです: 1. メイン システムは、処理する必要があるタスク名とタスク パラメーターをキューにプッシュします。 2. タスク システムは、タスク キューをリアルタイムでポップアウトし、サブプロセスをフォークし、サブプロセスが特定のタスク ロジックを完了します。

具体的なコードは次のとおりです:

/** * 启动守护进程 */public function runAction() {    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');    while (true) {        $this->fork_process();    }    exit;}/** * 创建子进程 */private function fork_process() {    $ppid = getmypid();    $pid = pcntl_fork();    if ($pid == 0) {//子进程        $pid = posix_getpid();        //echo "* Process {$pid} was created \n\n";        $this->mq_process();        exit;    } else {//主进程        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态        if (pcntl_wifexited($status)) {            //echo "\n\n* Sub process: {$pid} exited with {$status}";            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );        } else {            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');        }    }}/** * 业务任务队列处理 */private function mq_process() {    $data_pop = $this->masterRedis->rPop($this->redis_list_key);    $data = json_decode($data_pop, 1);    if (!$data) {        return FALSE;    }    $worker = '_task_' . $data['worker'];    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';    $params = $data['params'];    $class = new $class_name();    $class->$worker($params);    return TRUE;}

これは単純なタスク処理システムです。

このタスク システムは非同期実装の実現に役立ち、これまでのところ 1 年近く安定して実行されています。

しかし、残念ながら、これは単一プロセス システムです。常にフォークしており、タスクがあれば処理され、タスクがなければスキップされます。

これは非常に安定しています。

しかし、問題が 2 つあります。1 つは、常にフォークとポップを行うとサーバー リソースが無駄になるということです。もう 1 つは、同時実行がサポートされていないということです。

最初の問題は大丈夫ですが、2 番目の問題は非常に深刻です。

メインシステム が同時に多数のタスクをスローすると、タスクの処理時間は無限に延長されます。

新しいデザイン

同時実行の問題を解決するために、より効率的で強力なチーム処理システムを作成する予定です。

PHP7 より前はマルチスレッドがサポートされていなかったため、マルチプロセスを使用します。

インターネット上で多くの情報を見つけました。いわゆるマルチプロセスのほとんどは、バックグラウンドで同時に実行されている N 個のプロセスです。

明らかにこれは不適切です。

私の期待は次のとおりです:

タスクが飛び出すたびにタスクをフォークし、タスクの実行が完了した後に子プロセスが終了します。

発生した問題


1. プロセスの最大数を制御する方法

この問題は非常に単純です。つまり、子プロセスの各フォークがそれ自身を増加させます。子プロセスが完了すると一度デクリメントされます。

自動インクリメントには問題はありません。メインプロセスで操作するだけです。では、自分で減らすにはどうすればよいでしょうか?

おそらく、それは子プロセス内にあると言うかもしれません。ただし、ここで注意が必要です。フォークすると、リソースがメインプロセスから子プロセスにコピーされるため、メインプロセスのカウンタを子プロセスで操作することはできません。

つまり、ここで理解する必要がある知識点があります。それはシグナルです。

詳細については自分で Google で調べたり、ここでコードを直接確認したりできます。

// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, "sig_handler"));

これにより、シグナルプロセッサがインストールされます。もちろん、まだ足りないものが 1 つあります。

declare(ticks = 1);

declare は制御構造ステートメントです。具体的な使用法については Google を参照してください。

このコードの意味は、低レベルのステートメントが実行されるたびにシグナルプロセッサを呼び出すことです。

このようにして、子プロセスが終了するたびにシグナルプロセッサが呼び出され、シグナルプロセッサ内でそれをデクリメントすることができます。

2. プロセス残留問題の解決方法

マルチプロセス開発では、適切に処理されないとプロセス残留が発生します。

プロセス残留物の問題を解決するには、子プロセスをリサイクルする必要があります。

次に、子プロセスをどのようにリサイクルするかが技術的なポイントになります。

多くのブログ投稿を含む pcntl のデモでは、子プロセスがメインプロセスで再利用されていると言われています。

しかし、私たちは Redis の brpop に基づいており、brpop はブロックしています。

これにより問題が発生します。N 個のタスクを実行した後、タスク システムがアイドル状態のときにメイン プロセスがブロックされ、ブロックが発生したときに子プロセスがまだ実行中であるため、最後のいくつかの子プロセスのプロセス リサイクルを完了できません。 。 。

これはいつも複雑な場所でしたが、信号プロセッサを手に入れてからは非常に簡単になりました。

プロセスリサイクルも信号プロセッサに配置されます。

新システムの評価

pcntl はプロセス処理拡張機能ですが、残念ながらマルチプロセスのサポートが非常に弱いです。

ここでは Swoole 拡張機能のプロセスが使用されています。

具体的なコードは次のとおりです:

declare(ticks = 1);class JobDaemonController extends Yaf_Controller_Abstract{    use Trait_Redis;    private $maxProcesses = 800;    private $child;    private $masterRedis;    private $redis_task_wing = 'task:wing'; //待处理队列    public function init(){        // install signal handler for dead kids        pcntl_signal(SIGCHLD, array($this, "sig_handler"));        set_time_limit(0);        ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection    }    private function redis_client(){        $rds = new Redis();        $rds->connect('redis.master.host',6379);        return $rds;    }    public function process(swoole_process $worker){// 第一个处理        $GLOBALS['worker'] = $worker;        swoole_event_add($worker->pipe, function($pipe) {            $worker = $GLOBALS['worker'];            $recv = $worker->read();            //send data to master            sleep(rand(1, 3));            echo "From Master: $recv\n";            $worker->exit(0);        });        exit;    }    public function testAction(){        for ($i = 0; $i < 10000; $i++){            $data = [                'abc' => $i,                'timestamp' => time().rand(100,999)            ];            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));        }        exit;    }    public function runAction(){        while (1){//            echo "\t now we de have $this->child child processes\n";            if ($this->child < $this->maxProcesses){                $rds = $this->redis_client();                $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待                if (!$data_pop){                    continue;                }                echo "\t Starting new child | now we de have $this->child child processes\n";                $this->child++;                $process = new swoole_process([$this, 'process']);                $process->write(json_encode($data_pop));                $pid = $process->start();            }        }    }    private function sig_handler($signo) {//        echo "Recive: $signo \r\n";        switch ($signo) {            case SIGCHLD:                while($ret = swoole_process::wait(false)) {//                    echo "PID={$ret['pid']}\n";                    $this->child--;                }        }    }}

最後に、テストの後、シングルコア 1G サーバーは 1 ~ 3 秒間タスクを実行することで 800 の同時実行性を達成できます。

追伸: マスターの皆様、私とコミュニケーションを取るのを歓迎します。私はもっと上手くできるだろうか〜


私はヤンおじさん、2 週間一生懸命働いてきた野生のプログラマーです

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
トラフィックの高いウェブサイトのPHPパフォーマンスチューニングトラフィックの高いウェブサイトのPHPパフォーマンスチューニングMay 14, 2025 am 12:13 AM

thesecrettokeepingaphp-poweredwebsterunningsmootlyunderheavyloadinvolvesseveralkeystrategies:1)emform opcodecoduceSciptionexecutiontime、2)aatabasequerycachingwithiThing withiThistolessendavasoload、

PHPでの依存関係注射:初心者向けのコード例PHPでの依存関係注射:初心者向けのコード例May 14, 2025 am 12:08 AM

コードをより明確かつ維持しやすくするため、依存関係が関心(DI)に注意する必要があります。 1)DIは、クラスを切り離すことにより、よりモジュール化されます。2)テストとコードの柔軟性の利便性を向上させ、3)DIコンテナを使用して複雑な依存関係を管理しますが、パフォーマンスの影響と円形の依存関係に注意してください。

PHPパフォーマンス:アプリケーションを最適化することは可能ですか?PHPパフォーマンス:アプリケーションを最適化することは可能ですか?May 14, 2025 am 12:04 AM

はい、最適化されたAphPossibleandessention.1)CachingingusapCutoredatedAtabaseload.2)最適化、効率的なQueries、およびConnectionPooling.3)EnhcodeCodewithBultinctions、Avoididingglobalbariables、およびUsingopcodeching

PHPパフォーマンスの最適化:究極のガイドPHPパフォーマンスの最適化:究極のガイドMay 14, 2025 am 12:02 AM

keyStrategIestsoSificlyvoostphpappliceperformanceare:1)useopcodecachinglikeToreexecutiontime、2)最適化abaseの相互作用とプロペラインデックス、3)3)構成

PHP依存性噴射コンテナ:クイックスタートPHP依存性噴射コンテナ:クイックスタートMay 13, 2025 am 12:11 AM

aphpDependencyInjectionContaineriSATOULTAINATINAGECLASSDEPTINCIES、強化測定性、テスト可能性、および維持可能性。

PHPの依存噴射対サービスロケーターPHPの依存噴射対サービスロケーターMay 13, 2025 am 12:10 AM

SELECT DEPENTENCINGINOFCENT(DI)大規模なアプリケーションの場合、ServicElocatorは小さなプロジェクトまたはプロトタイプに適しています。 1)DIは、コンストラクターインジェクションを通じてコードのテスト可能性とモジュール性を改善します。 2)ServiceLocatorは、センター登録を通じてサービスを取得します。これは便利ですが、コードカップリングの増加につながる可能性があります。

PHPパフォーマンス最適化戦略。PHPパフォーマンス最適化戦略。May 13, 2025 am 12:06 AM

phpapplicationscanbeoptimizedforspeedandEfficiencyby:1)enabingopcacheinphp.ini、2)PreparedStatementswithpordatabasequeriesを使用して、3)LoopswithArray_filterandarray_mapfordataprocessing、4)の構成ngincasaSearverseproxy、5)

PHPメールの検証:電子メールが正しく送信されるようにしますPHPメールの検証:電子メールが正しく送信されるようにしますMay 13, 2025 am 12:06 AM

PHPemailvalidationinvolvesthreesteps:1)Formatvalidationusingregularexpressionstochecktheemailformat;2)DNSvalidationtoensurethedomainhasavalidMXrecord;3)SMTPvalidation,themostthoroughmethod,whichchecksifthemailboxexistsbyconnectingtotheSMTPserver.Impl

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

SublimeText3 英語版

SublimeText3 英語版

推奨: Win バージョン、コードプロンプトをサポート!

EditPlus 中国語クラック版

EditPlus 中国語クラック版

サイズが小さく、構文の強調表示、コード プロンプト機能はサポートされていません

VSCode Windows 64 ビットのダウンロード

VSCode Windows 64 ビットのダウンロード

Microsoft によって発売された無料で強力な IDE エディター

Dreamweaver Mac版

Dreamweaver Mac版

ビジュアル Web 開発ツール

AtomエディタMac版ダウンロード

AtomエディタMac版ダウンロード

最も人気のあるオープンソースエディター