ホームページ >バックエンド開発 >PHPチュートリアル >SwooleとRedisをベースにした同時キュー処理システム

SwooleとRedisをベースにした同時キュー処理システム

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBオリジナル
2016-06-23 13:08:401726ブラウズ

バックグラウンド

PHP はマルチスレッドをサポートしていませんが、完全なシステムとしては、非同期で完了する必要がある操作が多数あります。これらの非同期操作を完了するために、Redis キュー タスク システムを構築しました。

ご存知のとおり、メッセージ キュー処理システムは主にコンシューマーとプロデューサーの 2 つの部分に分かれています。

私たちのシステムでは、メインシステムがプロデューサーとして機能し、タスクシステムがコンシューマーとして機能します。

具体的なワークフローは次のとおりです: 1. メイン システムは、処理する必要があるタスク名とタスク パラメーターをキューにプッシュします。 2. タスク システムは、タスク キューをリアルタイムでポップアウトし、サブプロセスをフォークし、サブプロセスが特定のタスク ロジックを完了します。

具体的なコードは次のとおりです:

/** * 启动守护进程 */public function runAction() {    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');    while (true) {        $this->fork_process();    }    exit;}/** * 创建子进程 */private function fork_process() {    $ppid = getmypid();    $pid = pcntl_fork();    if ($pid == 0) {//子进程        $pid = posix_getpid();        //echo "* Process {$pid} was created \n\n";        $this->mq_process();        exit;    } else {//主进程        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态        if (pcntl_wifexited($status)) {            //echo "\n\n* Sub process: {$pid} exited with {$status}";            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );        } else {            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');        }    }}/** * 业务任务队列处理 */private function mq_process() {    $data_pop = $this->masterRedis->rPop($this->redis_list_key);    $data = json_decode($data_pop, 1);    if (!$data) {        return FALSE;    }    $worker = '_task_' . $data['worker'];    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';    $params = $data['params'];    $class = new $class_name();    $class->$worker($params);    return TRUE;}

これは単純なタスク処理システムです。

このタスク システムは非同期実装の実現に役立ち、これまでのところ 1 年近く安定して実行されています。

しかし、残念ながら、これは単一プロセス システムです。常にフォークしており、タスクがあれば処理され、タスクがなければスキップされます。

これは非常に安定しています。

しかし、問題が 2 つあります。1 つは、常にフォークとポップを行うとサーバー リソースが無駄になるということです。もう 1 つは、同時実行がサポートされていないということです。

最初の問題は大丈夫ですが、2 番目の問題は非常に深刻です。

メインシステム が同時に多数のタスクをスローすると、タスクの処理時間は無限に延長されます。

新しいデザイン

同時実行の問題を解決するために、より効率的で強力なチーム処理システムを作成する予定です。

PHP7 より前はマルチスレッドがサポートされていなかったため、マルチプロセスを使用します。

インターネット上で多くの情報を見つけました。いわゆるマルチプロセスのほとんどは、バックグラウンドで同時に実行されている N 個のプロセスです。

明らかにこれは不適切です。

私の期待は次のとおりです:

タスクが飛び出すたびにタスクをフォークし、タスクの実行が完了した後に子プロセスが終了します。

発生した問題


1. プロセスの最大数を制御する方法

この問題は非常に単純です。つまり、子プロセスの各フォークがそれ自身を増加させます。子プロセスが完了すると一度デクリメントされます。

自動インクリメントには問題はありません。メインプロセスで操作するだけです。では、自分で減らすにはどうすればよいでしょうか?

おそらく、それは子プロセス内にあると言うかもしれません。ただし、ここで注意が必要です。フォークすると、リソースがメインプロセスから子プロセスにコピーされるため、メインプロセスのカウンタを子プロセスで操作することはできません。

つまり、ここで理解する必要がある知識点があります。それはシグナルです。

詳細については自分で Google で調べたり、ここでコードを直接確認したりできます。

// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, "sig_handler"));

これにより、シグナルプロセッサがインストールされます。もちろん、まだ足りないものが 1 つあります。

declare(ticks = 1);

declare は制御構造ステートメントです。具体的な使用法については Google を参照してください。

このコードの意味は、低レベルのステートメントが実行されるたびにシグナルプロセッサを呼び出すことです。

このようにして、子プロセスが終了するたびにシグナルプロセッサが呼び出され、シグナルプロセッサ内でそれをデクリメントすることができます。

2. プロセス残留問題の解決方法

マルチプロセス開発では、適切に処理されないとプロセス残留が発生します。

プロセス残留物の問題を解決するには、子プロセスをリサイクルする必要があります。

次に、子プロセスをどのようにリサイクルするかが技術的なポイントになります。

多くのブログ投稿を含む pcntl のデモでは、子プロセスがメインプロセスで再利用されていると言われています。

しかし、私たちは Redis の brpop に基づいており、brpop はブロックしています。

これにより問題が発生します。N 個のタスクを実行した後、タスク システムがアイドル状態のときにメイン プロセスがブロックされ、ブロックが発生したときに子プロセスがまだ実行中であるため、最後のいくつかの子プロセスのプロセス リサイクルを完了できません。 。 。

これはいつも複雑な場所でしたが、信号プロセッサを手に入れてからは非常に簡単になりました。

プロセスリサイクルも信号プロセッサに配置されます。

新システムの評価

pcntl はプロセス処理拡張機能ですが、残念ながらマルチプロセスのサポートが非常に弱いです。

ここでは Swoole 拡張機能のプロセスが使用されています。

具体的なコードは次のとおりです:

declare(ticks = 1);class JobDaemonController extends Yaf_Controller_Abstract{    use Trait_Redis;    private $maxProcesses = 800;    private $child;    private $masterRedis;    private $redis_task_wing = 'task:wing'; //待处理队列    public function init(){        // install signal handler for dead kids        pcntl_signal(SIGCHLD, array($this, "sig_handler"));        set_time_limit(0);        ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection    }    private function redis_client(){        $rds = new Redis();        $rds->connect('redis.master.host',6379);        return $rds;    }    public function process(swoole_process $worker){// 第一个处理        $GLOBALS['worker'] = $worker;        swoole_event_add($worker->pipe, function($pipe) {            $worker = $GLOBALS['worker'];            $recv = $worker->read();            //send data to master            sleep(rand(1, 3));            echo "From Master: $recv\n";            $worker->exit(0);        });        exit;    }    public function testAction(){        for ($i = 0; $i < 10000; $i++){            $data = [                'abc' => $i,                'timestamp' => time().rand(100,999)            ];            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));        }        exit;    }    public function runAction(){        while (1){//            echo "\t now we de have $this->child child processes\n";            if ($this->child < $this->maxProcesses){                $rds = $this->redis_client();                $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待                if (!$data_pop){                    continue;                }                echo "\t Starting new child | now we de have $this->child child processes\n";                $this->child++;                $process = new swoole_process([$this, 'process']);                $process->write(json_encode($data_pop));                $pid = $process->start();            }        }    }    private function sig_handler($signo) {//        echo "Recive: $signo \r\n";        switch ($signo) {            case SIGCHLD:                while($ret = swoole_process::wait(false)) {//                    echo "PID={$ret['pid']}\n";                    $this->child--;                }        }    }}

最後に、テストの後、シングルコア 1G サーバーは 1 ~ 3 秒間タスクを実行することで 800 の同時実行性を達成できます。

追伸: マスターの皆様、私とコミュニケーションを取るのを歓迎します。私はもっと上手くできるだろうか〜


私はヤンおじさん、2 週間一生懸命働いてきた野生のプログラマーです

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。