ホームページ >バックエンド開発 >PHPチュートリアル >kestrel php メッセージ キューを再印刷する
转下kestrel php 消息队列
私たちはしばらく仕事で Twitter の kestrel キュー サーバーを使用していますが、Python で書かれたサービス層からのみ使用しています。 PHP で記述されたアプリケーション層からのキューのニーズがいくつかあるため、今週数日かけて Web アプリケーションにキューのサポートを追加しました。私が学んだことと、それをどのように実践したかを共有したいと思いました。
目標
Kestrel サーバー自体は非常に簡単に起動して実行できました。唯一指摘したいのは、マスターを使用しようとしたときはかなり不安定だったので、リリースブランチに固執することをお勧めするということです。クライアントの実装に関して、開始時に念頭に置いていた目標がいくつかありました。
? Kestrel は memcache プロトコルに基づいて構築されているため、memcache クライアントを最初から構築するのではなく、既存の memcache クライアントを活用してみてください
???以前ここで説明した既存のバッチ ジョブ インフラストラクチャを活用し、マルチテナントのニーズが確実に満たされるようにします
???後でキューサーバーを変更する場合に備えて、キューインターフェースを汎用のままにしておきます
???既存のケストレル管理ツールを利用し、必要な機能のみを構築します
これらの目標を念頭に置いて、Kestrel クライアント、プロデューサー、コンシューマー、そしてコンシューマーを実行するための非常に小さな CLI ハーネスの 4 つのコンポーネントを完成しました。しかし、何もコーディングする前に、同僚の Matt Erkkila が書いた Kestrel の Web UI である Kestrel Web をセットアップしました。 Kestrel Web を使用すると、Kestrel の統計を表示したり、キューを管理したり、手動入力に基づいてキューを並べ替えたりフィルターしたりすることができます。このツールを最初から起動して実行することで、ジョブがテスト キューに追加および消費されるのを簡単に確認でき、必要に応じてキューを簡単にフラッシュすることもできました。
Kestrel クライアント
PHP 用の既存の Kestrel クライアントが見つからなかったので、2 つの memcache 拡張機能を調べ始めました。古い memcache と Andrei Zmievski の memcached で、後者は libmemcached ライブラリに基づいています。 memcache から始めましたが、最初は問題なく動作していましたが、タイムアウトを変更できないことがすぐにわかりました。これは道の邪魔ですか? kestrel は新しいジョブをポーリングすることを推奨しています。ポーリング タイムアウトを 1 秒以上 (memcache のデフォルト) に設定しようとすると、memcache 拡張機能からのタイムアウト エラーが表示されます。 memcached 拡張機能にはこれらの問題がないため、これを使用しました。
最初に遭遇した問題は連載でした。 memcached のシリアライザーを使用して kestrel に書き込むことはできますが、データを読み戻すときに、シリアライザーであることが認識されません。したがって、クライアントでデータを手動でシリアル化するだけで、問題なく動作します。もう 1 つ注意すべき点は、memcached 拡張機能はデフォルトで 100 バイトを超えるものを自動的に圧縮し、kestrel から読み取るときに解凍しないため、圧縮を無効にするか手動で行う必要があることです。
もう 1 つの問題は、カスタム Kestrel コマンドを使用したくても使用できないことです。アプリケーション層には特別なことは何も必要ないため、memcached 拡張機能は問題なく機能します。 Kestrel 2 で今後のモニター (バッチ処理) のサポートが必要になったら、Kestrel クライアントを最初から実装する必要があるかもしれません。 Kestrel Web は、私たちが現在必要としているものをすべて提供します。
memcached を使用することが決定したら、そのためのライト デコレータ EC_KestrelClient を作成しました。これは、memcached クライアントのインスタンス化、シリアル化、および GET コマンドに対するいくつかの kestrel 固有のオプションのヘルパーを処理します。また、memcached 固有のオプションを渡すサポートもあります。クラスは最終的に次のようになりました:
?
<?php /** * A thin kestrel client that wraps Memcached (libmemcached extension) * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_KestrelClient { /** * The Memcached instance * * @var Memcached */ protected $_memcached = null; /** * The Kestrel server IP * * @var string */ protected $_host = '127.0.0.1'; /** * The Kestrel server port * * @var string */ protected $_port = 22133; /** * Optional options, not currently used * * @var array */ protected $_options = array(); /** * Sets the host, port, and options to be used * * @param string $host The host to use, defaults to 127.0.0.1 * @param int $port The port to use, defaults to 22133 * @param array $options Memcached options, not currently used * * @return void */ public function __construct( $host = '127.0.0.1', $port = 22133, array $options = array() ) { $this->_host = $host; $this->_port = $port; $this->setOptions($options); } /** * Sets job data on the queue, json_encoding the value to avoid problematic * serialization. * * @param string $queue The queue name * @param mixed $data The data to store * * @return bool */ public function set($queue, $data) { // Local json serialization, as kestrel doesn't send serialization flags return $this->getMemcached()->set($queue, json_encode($data)); } /** * Reliably read an item off of the queue. Meant to be run in a loop, and * call closeReliableRead() when done to make sure the final job is not left * on the queue. * * @param mixed $queue The queue name to read from * @param int $timeout The timeout to wait for a job to appear * * @return array|false * @see closeReliableRead() */ public function reliableRead($queue, $timeout = 1000) { $queue = $queue . '/close/open/t=' . $timeout; $result = $this->getMemcached()->get($queue); if ($result === false) { return $result; } // Local json serialization, as kestrel doesn't send serialization flags return json_decode($result, true); } /** * Closes any existing open read * * @param string $queue The queue name * * @return false */ public function closeReliableRead($queue) { $queue = $queue . '/close'; return $this->getMemcached()->get($queue); } /** * Aborts an existing reliable read * * @param string $queue The queue name * * @return false */ public function abortReliableRead($queue) { $queue = $queue . '/abort'; return $this->getMemcached()->get($queue); } /** * Set an option to be used with the Memcached client. Not used. * * @param string $name The option name * @param value $value The option value * * @return void */ public function setOption($name, $value) { $this->_options[$name] = $value; } /** * Sets multiple options * * @param array $options Array of key/values to set * * @return void */ public function setOptions(array $options) { foreach ($options as $name => $value) { $this->setOption($name, $value); } } /** * Gets a current option's value * * @param string $name The option name * * @return mixed */ public function getOption($name) { if (isset($this->_options[$name])) { return $this->_options[$name]; } return null; } /** * Gets all current options * * @return array */ public function getOptions() { return $this->_options; } /** * Gets a singleton instance of the Memcached client * * @return Memcached */ public function getMemcached() { if ($this->_memcached === null) { $this->_initMemcached(); } return $this->_memcached; } /** * Initialized the Memcached client instance * * @return void */ protected function _initMemcached() { $this->_memcached = $this->_getMemcachedInstance(); foreach ($this->_options as $option => $value) { $this->_memcached->setOption($option, $value); } $this->_memcached->addServer($this->_host, $this->_port); $this->_memcached->setOption(Memcached::OPT_COMPRESSION, false); } // @codeCoverageIgnoreStart /** * Returns a new instance of Memcached. Abstracted for testing. * * @return Memcached */ protected function _getMemcachedInstance() { return new Memcached(); } // @codeCoverageIgnoreEnd }
?
?
生の EC_KestrelClient.php を表示する この要点は GitHub から提供されています。
プロデューサー
プロデューサーはとてもシンプルです。現在のテナント情報を含む標準構造にデータをフォーマットし、他のプロジェクトと衝突しないようにキューに名前空間を設定し、キューに追加するだけです。プロデューサーは次のようになります:
?
<?php /** * Interface for adding jobs to a queue server * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_Producer { /** * Adds a job onto a queue * * @param string $queue The queue name to add a job to * @param string $jobName The job name for the consumer to run * @param mixed $data Optional additional data to pass to the job * * @return bool */ public function addJob($queue, $jobName, $data = null) { $item = array( 'instance' => EC::getCurrentInstanceName(), 'jobName' => $jobName ); if ($data !== null) { $item['data'] = $data; } // Namespace queue with project $queue = 'enterprise_' . $queue; $client = $this->_getKestrelClient(); return $client->set($queue, $item); } // @codeCoverageIgnoreStart /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */ protected function _getKestrelClient() { if (APPLICATION_ENV === 'testing') { throw new Exception(__METHOD__ . ' was not mocked when testing'); } static $client = null; if ($client === null) { $host = EC::getConfigOption('kestrel.host'); $port = EC::getConfigOption('kestrel.port'); $client = new EC_KestrelClient($host, $port); } return $client; } // @codeCoverageIgnoreEnd }
?
?
?
生の EC_Producer.php を表示する この要点は GitHub によって提供されています。
消費者
消費者にはもう少し理解する必要がありますが、それでも非常に簡単です。これは、daemontools や Supervisord などの監視ツールから実行することを目的としているため、CLI 引数を EC_Consumer に渡して実行するだけの非常に小さな CLI ハーネスがあります。 CLI 引数を解析した後、EC_Consumer は新しいジョブを求めて kestrel をポーリングし、標準のバッチ ジョブ インフラストラクチャを通じてそれらを実行します。 PHP の長時間実行プロセス能力にさらに自信が持てるまで、オプションの maxium jobs 引数を追加しました。これにより、コンシューマが X 個を超えるジョブを処理するのを停止して終了します。監視サービス (supervisord) は数秒以内に再起動します。また、テスト用にオプションのデバッグ引数も追加したので、発生するすべてのアクションを確認できます。 CLI ハーネスは次のようになります:
?
#!/bin/env php <?php // External application bootstrapping require_once __DIR__ . '/cli_init.php'; // Instantiate and run the consumer $consumer = new EC_Consumer($argv); $consumer->run();
?
view raw consumer_cli.php This Gist brought to you by GitHub.
And the main consumer class, EC_Consumer, looks something like this:
<?php /** * Enterprise queue consumer interface, called by bin/consumer_cli.php * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_Consumer { /** * Instance of {@link Zend_Console_Getopt} * * @var Zend_Console_Getopt */ protected $_opt = null; /** * Which APPLICATION_ENV to run under (see -e) * * @var string */ protected $_environment = null; /** * The kestrel server IP * * @var string */ protected $_host = null; /** * The kestrel server port * * @var int */ protected $_port = null; /** * The kestrel queue name to connect to * * @var string */ protected $_queue = null; /** * Whether we should show debug output * * @var bool */ protected $_debug = false; /** * Maximum # of jobs for this process to perform (for memory fail safe) * * @var int */ protected $_maxJobs = null; /** * Current job count * * @var int */ protected $_jobCount = 0; /** * Parses arguments from the command line and does error handling * * @param array $argv The $argv from bin/ecli.php * * @throw Zend_Console_Getopt_Exception on failure * @return void */ public function __construct(array $argv) { try { $opt = new Zend_Console_Getopt( array( 'environment|e=s' => 'environment name (e.g. development)' . ', required', 'server|s=s' => 'kestrel server, format of host:port' . ', required', 'queue|q=s' => 'queue name (e.g. crawler_campaign)' . ', required', 'max-jobs|m=s' => 'max jobs to run before exiting' . ', optional', 'debug|d' => 'show debug output' . ', optional', ) ); $opt->setArguments($argv); $opt->parse(); // Set environment if ($opt->e === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing environment' ); } $this->_environment = $opt->e; // @codeCoverageIgnoreStart if (!defined('APPLICATION_ENV')) { define('APPLICATION_ENV', $this->_environment); } // @codeCoverageIgnoreEnd // Set server if ($opt->s === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing server' ); } $parts = explode(':', $opt->s); if (count($parts) !== 2) { throw new Zend_Console_Getopt_Exception( 'Error: invalid server: ' . $opt->s ); } $this->_host = $parts[0]; $this->_port = $parts[1]; // Set queue if ($opt->q === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing queue' ); } $this->_queue = $opt->q; // Set max-jobs if ($opt->m !== null) { $this->_maxJobs = $opt->m; } // Set debug if ($opt->d !== null) { $this->_debug = true; } } catch (Zend_Console_Getopt_Exception $e) { echo "\n" . $e->getMessage() . "\n\n"; echo $opt->getUsageMessage(); // @codeCoverageIgnoreStart if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') { exit(1); } // @codeCoverageIgnoreEnd } $this->_opt = $opt; } /** * Polls the queue server for jobs and runs them as they come in * * @return void */ public function run() { $client = $this->_getKestrelClient(); $queue = 'enterprise_' . $this->_queue; while ($this->_keepRunning()) { // Pull job from queue $job = $client->reliableRead($queue, 500); if ($job === false) { $this->_debug('Nothing on queue ' . $queue); continue; } if (!isset($job['instance'])) { echo 'Instance not set in queue job: ' . print_r($job, true); continue; } $instance = $job['instance']; if (!isset($job['jobName'])) { echo 'Job name not set in queue job: ' . print_r($job, true); continue; } $jobName = $job['jobName']; $data = null; if (isset($job['data'])) { $data = $job['data']; } // Run the job $returnCode = $this->runJob($instance, $jobName, $data); if ($returnCode !== 0) { $client->abortReliableRead($queue); continue; } } $client->closeReliableRead($queue); } /** * Runs the job via bin/ecli.php * * @param string $instance The instance name to run the job under * @param string $jobName The job name * @param string $data Optional extra data * * @return int */ public function runJob($instance, $jobName, $data) { $cmd = BASE_PATH . '/bin/ecli.php ' . '-e ' . $this->_environment . ' -i ' . $instance . ' -j ' . $jobName; if ($data) { $cmd .= " '" . base64_encode(json_encode($data)) . "'"; } $returnCode = $this->_passthru($cmd); $this->_jobCount++; $this->_debug('Job count: ' . $this->_jobCount); return $returnCode; } /** * Check to see if the job limit has been reached * * @return bool */ protected function _keepRunning() { return ($this->_maxJobs === null) ? true : ($this->_jobCount < $this->_maxJobs); } /** * Show debug messages * * @param mixed $message * * @return void */ protected function _debug($message) { if (!$this->_debug) { return; } echo $message . "\n"; } // @codeCoverageIgnoreStart /** * Calls the passthru() function and returns the exit code. Abstracted * for testing. * * @param string $cmd The command to execute * * @return int */ protected function _passthru($cmd) { passthru($cmd, $returnCode); return $returnCode; } /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */ protected function _getKestrelClient() { if (APPLICATION_ENV === 'testing') { throw new Exception(__METHOD__ . ' was not mocked when testing'); } return new EC_KestrelClient($this->_host, $this->_port); } // @codeCoverageIgnoreEnd }
?
?
view raw EC_Consumer.php This Gist brought to you by GitHub.
Putting it together
Now that all the pieces are put together, let's take a look at in action. Adding example job "HelloWorld" to the queue "hello_world" from within our application looks something like this:
<?php $producer = new EC_Producer(); $producer->addJob('hello_world', 'HelloWorld', array('foo' => 'bar')); ?> view raw gistfile1.php This Gist brought to you by GitHub.
?
?
And finally, here's an example of running the consumer from the CLI harness, along with some example debug output of processing the job:
./bin/consumer_cli.php -e development -s 127.0.0.1:22133 -q hello_world -d -m 2
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 1
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 2
view raw example.txt This Gist brought to you by GitHub.
That's it! I'd be interested to hear how other folks are interfacing with kestrel from PHP.