搜尋
首頁後端開發php教程转载 kestrel php 讯息队列

转载 kestrel php 消息队列

We've been using Twitter's kestrel queue server for a while now at work, but only from our service layer, which is written in python.? Now that we have some queueing needs from our application layer, written in PHP, I spent a few days this week adding queue support to our web application.? I thought I'd share what I learned, and how I implemented it.

Goals

The kestrel server itself was pretty straightforward to get up and running.? The only thing I would point out is that I recommend sticking to release branches, as master was fairly unstable when I tried to use it.? Regarding implementing the client, there were a few goals I had in mind when I started:

??? Since kestrel is built on the memcache protocol, try and leverage an existing memcache client rather than build one from scratch
??? Utilize our existing batch job infrastructure, which I covered previously here, and make sure our multi-tenant needs are met
??? Keep the queue interface generic in case we change queue servers later
??? Utilize existing kestrel management tools, only build out the the functionality we need

With these goals in mind, I ended up with 4 components: a kestrel client, a producer, a consumer, and a very small CLI harness for running the consumer.? But before I even coded anything, I set up kestrel web, a web UI for kestrel written by my co-worker Matt Erkkila.? Kestrel web allows you to view statistics on kestrel, manage queues, as well as sort and filter queues based on manual inputs.? Having this tool up and running from the get go made it easy to watch jobs get added and consumed from my test queue, and also easily flush out the queues as needed.

The Kestrel Client

I couldn't find any existing kestrel clients for PHP, so I started looking at the two memcache extensions: the older memcache, and Andrei Zmievski's memcached, the latter of which is based on the libmemcached library.? I started with memcache, and while it worked fine initially, I quickly found that I could not modify timeouts.? This interfered with the way? kestrel recommends you poll it for new jobs, and I would see timeout errors from the memcache extension if you tried to set the poll timeout to 1 second or higher (the memcache default).? The memcached extension does not have these issues, so I went with it.

The first gotcha I ran into was serialization.? You can use memcached's serializer for writing to kestrel, but when it reads the data back, it doesn't recognize that it is serialized.? So I just serialize the data manually in my client, and things work fine. One other thing to note is that you'll want to disable compression, or do it manually, as the memcached extension will automatically compress anything over 100 bytes by default, and will not decompress it when reading from kestrel.

The other issue is that if you want to use any custom kestrel commands, you can't.? Since the application layer doesn't need anything fancy, the memcached extension will work fine for it.? Once we need support for the upcoming monitor (batching) in kestrel 2, we may need to implement a kestrel client from scratch.? Kestrel web supplies everything else we need right now.

Once the decision was made to use memcached, I wrote a light decorator for it, EC_KestrelClient.? This handles instantiation of the memcached client, serialization, and helpers for some kestrel specific options to the GET command.? It also has support for passing memcached specific options through it.? The class ended up looking like this:

?

<?php /** * A thin kestrel client that wraps Memcached (libmemcached extension) * * @author Bill Shupp <[email&#160;protected]> * @copyright 2010-2011 Empower Campaigns */ class EC_KestrelClient {     /** * The Memcached instance * * @var Memcached */     protected $_memcached = null;     /** * The Kestrel server IP * * @var string */     protected $_host = '127.0.0.1';     /** * The Kestrel server port * * @var string */     protected $_port = 22133;     /** * Optional options, not currently used * * @var array */     protected $_options = array();     /** * Sets the host, port, and options to be used * * @param string $host The host to use, defaults to 127.0.0.1 * @param int $port The port to use, defaults to 22133 * @param array $options Memcached options, not currently used * * @return void */     public function __construct(         $host = '127.0.0.1', $port = 22133, array $options = array()     )     {         $this->_host = $host;         $this->_port = $port;         $this->setOptions($options);     }     /** * Sets job data on the queue, json_encoding the value to avoid problematic * serialization. * * @param string $queue The queue name * @param mixed $data The data to store * * @return bool */     public function set($queue, $data)     {         // Local json serialization, as kestrel doesn't send serialization flags         return $this->getMemcached()->set($queue, json_encode($data));     }     /** * Reliably read an item off of the queue. Meant to be run in a loop, and * call closeReliableRead() when done to make sure the final job is not left * on the queue. * * @param mixed $queue The queue name to read from * @param int $timeout The timeout to wait for a job to appear * * @return array|false * @see closeReliableRead() */     public function reliableRead($queue, $timeout = 1000)     {         $queue = $queue . '/close/open/t=' . $timeout;         $result = $this->getMemcached()->get($queue);         if ($result === false) {             return $result;         }         // Local json serialization, as kestrel doesn't send serialization flags         return json_decode($result, true);     }     /** * Closes any existing open read * * @param string $queue The queue name * * @return false */     public function closeReliableRead($queue)     {         $queue = $queue . '/close';         return $this->getMemcached()->get($queue);     }     /** * Aborts an existing reliable read * * @param string $queue The queue name * * @return false */     public function abortReliableRead($queue)     {         $queue = $queue . '/abort';         return $this->getMemcached()->get($queue);     }     /** * Set an option to be used with the Memcached client. Not used. * * @param string $name The option name * @param value $value The option value * * @return void */     public function setOption($name, $value)     {         $this->_options[$name] = $value;     }     /** * Sets multiple options * * @param array $options Array of key/values to set * * @return void */     public function setOptions(array $options)     {         foreach ($options as $name => $value) {             $this->setOption($name, $value);         }     }     /** * Gets a current option's value * * @param string $name The option name * * @return mixed */     public function getOption($name)     {         if (isset($this->_options[$name])) {             return $this->_options[$name];         }         return null;     }     /** * Gets all current options * * @return array */     public function getOptions()     {         return $this->_options;     }     /** * Gets a singleton instance of the Memcached client * * @return Memcached */     public function getMemcached()     {         if ($this->_memcached === null) {             $this->_initMemcached();         }         return $this->_memcached;     }     /** * Initialized the Memcached client instance * * @return void */     protected function _initMemcached()     {         $this->_memcached = $this->_getMemcachedInstance();         foreach ($this->_options as $option => $value) {             $this->_memcached->setOption($option, $value);         }         $this->_memcached->addServer($this->_host, $this->_port);         $this->_memcached->setOption(Memcached::OPT_COMPRESSION, false);     }     // @codeCoverageIgnoreStart     /** * Returns a new instance of Memcached. Abstracted for testing. * * @return Memcached */     protected function _getMemcachedInstance()     {         return new Memcached();     }     // @codeCoverageIgnoreEnd } 

?

?

view raw EC_KestrelClient.php This Gist brought to you by GitHub.

The Producer

The producer is very simple.? It just formats the data into a standard structure, including current tenant information, namespaces the queue so it doesn't collide with other projects, and adds it to the queue.? The producer looks like this:

?

<?php /** * Interface for adding jobs to a queue server * * @author Bill Shupp <[email&#160;protected]> * @copyright 2010-2011 Empower Campaigns */ class EC_Producer {     /** * Adds a job onto a queue * * @param string $queue The queue name to add a job to * @param string $jobName The job name for the consumer to run * @param mixed $data Optional additional data to pass to the job * * @return bool */     public function addJob($queue, $jobName, $data = null)     {         $item = array(             'instance' => EC::getCurrentInstanceName(),             'jobName' => $jobName         );         if ($data !== null) {             $item['data'] = $data;         }         // Namespace queue with project         $queue = 'enterprise_' . $queue;         $client = $this->_getKestrelClient();         return $client->set($queue, $item);     }     // @codeCoverageIgnoreStart     /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */     protected function _getKestrelClient()     {         if (APPLICATION_ENV === 'testing') {             throw new Exception(__METHOD__ . ' was not mocked when testing');         }         static $client = null;         if ($client === null) {             $host = EC::getConfigOption('kestrel.host');             $port = EC::getConfigOption('kestrel.port');             $client = new EC_KestrelClient($host, $port);         }         return $client;     }     // @codeCoverageIgnoreEnd } 

?

?

?

view raw EC_Producer.php This Gist brought to you by GitHub.

The Consumer

The consumer has a bit more to it, though still pretty straightforward.? It's intended to be run from a monitoring tool like daemontools or supervisord, so there is a very small CLI harness that just passes the CLI arguments into EC_Consumer and runs it.? After parsing the CLI arguments, EC_Consumer polls kestrel for new jobs, and runs them through our standard batch job infrastructure.? Until we have more confidence in PHP's long running process ability, I added an optional maxium jobs argument, which will stop the consumer from processing more than X jobs and then terminate.? The monitoring service (supervisord) will then just restart it in a matter of seconds.? I also added an optional debug argument for testing, so you can see every action as it happens.? The CLI harness looks like this:

?

#!/bin/env php <?php // External application bootstrapping require_once __DIR__ . '/cli_init.php'; // Instantiate and run the consumer $consumer = new EC_Consumer($argv); $consumer->run(); 

?

view raw consumer_cli.php This Gist brought to you by GitHub.

And the main consumer class, EC_Consumer, looks something like this:

<?php /** * Enterprise queue consumer interface, called by bin/consumer_cli.php * * @author Bill Shupp <[email&#160;protected]> * @copyright 2010-2011 Empower Campaigns */ class EC_Consumer {     /** * Instance of [email&#160;protected] Zend_Console_Getopt} * * @var Zend_Console_Getopt */     protected $_opt = null;     /** * Which APPLICATION_ENV to run under (see -e) * * @var string */     protected $_environment = null;     /** * The kestrel server IP * * @var string */     protected $_host = null;     /** * The kestrel server port * * @var int */     protected $_port = null;     /** * The kestrel queue name to connect to * * @var string */     protected $_queue = null;     /** * Whether we should show debug output * * @var bool */     protected $_debug = false;     /** * Maximum # of jobs for this process to perform (for memory fail safe) * * @var int */     protected $_maxJobs = null;     /** * Current job count * * @var int */     protected $_jobCount = 0;     /** * Parses arguments from the command line and does error handling * * @param array $argv The $argv from bin/ecli.php * * @throw Zend_Console_Getopt_Exception on failure * @return void */     public function __construct(array $argv)     {         try {             $opt = new Zend_Console_Getopt(                 array(                     'environment|e=s' => 'environment name (e.g. development)'                                          . ', required',                     'server|s=s' => 'kestrel server, format of host:port'                                          . ', required',                     'queue|q=s' => 'queue name (e.g. crawler_campaign)'                                          . ', required',                     'max-jobs|m=s' => 'max jobs to run before exiting'                                          . ', optional',                     'debug|d' => 'show debug output'                                          . ', optional',                 )             );             $opt->setArguments($argv);             $opt->parse();             // Set environment             if ($opt->e === null) {                 throw new Zend_Console_Getopt_Exception(                     'Error: missing environment'                 );             }             $this->_environment = $opt->e;             // @codeCoverageIgnoreStart             if (!defined('APPLICATION_ENV')) {                 define('APPLICATION_ENV', $this->_environment);             }             // @codeCoverageIgnoreEnd             // Set server             if ($opt->s === null) {                 throw new Zend_Console_Getopt_Exception(                     'Error: missing server'                 );             }             $parts = explode(':', $opt->s);             if (count($parts) !== 2) {                 throw new Zend_Console_Getopt_Exception(                     'Error: invalid server: ' . $opt->s                 );             }             $this->_host = $parts[0];             $this->_port = $parts[1];             // Set queue             if ($opt->q === null) {                 throw new Zend_Console_Getopt_Exception(                     'Error: missing queue'                 );             }             $this->_queue = $opt->q;             // Set max-jobs             if ($opt->m !== null) {                 $this->_maxJobs = $opt->m;             }             // Set debug             if ($opt->d !== null) {                 $this->_debug = true;             }         } catch (Zend_Console_Getopt_Exception $e) {             echo "\n" . $e->getMessage() . "\n\n";             echo $opt->getUsageMessage();             // @codeCoverageIgnoreStart             if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') {                 exit(1);             }             // @codeCoverageIgnoreEnd         }         $this->_opt = $opt;     }     /** * Polls the queue server for jobs and runs them as they come in * * @return void */     public function run()     {         $client = $this->_getKestrelClient();         $queue = 'enterprise_' . $this->_queue;         while ($this->_keepRunning()) {             // Pull job from queue             $job = $client->reliableRead($queue, 500);             if ($job === false) {                 $this->_debug('Nothing on queue ' . $queue);                 continue;             }             if (!isset($job['instance'])) {                 echo 'Instance not set in queue job: ' . print_r($job, true);                 continue;             }             $instance = $job['instance'];             if (!isset($job['jobName'])) {                 echo 'Job name not set in queue job: ' . print_r($job, true);                 continue;             }             $jobName = $job['jobName'];             $data = null;             if (isset($job['data'])) {                 $data = $job['data'];             }             // Run the job             $returnCode = $this->runJob($instance, $jobName, $data);             if ($returnCode !== 0) {                 $client->abortReliableRead($queue);                 continue;             }         }         $client->closeReliableRead($queue);     }     /** * Runs the job via bin/ecli.php * * @param string $instance The instance name to run the job under * @param string $jobName The job name * @param string $data Optional extra data * * @return int */     public function runJob($instance, $jobName, $data)     {         $cmd = BASE_PATH . '/bin/ecli.php '             . '-e ' . $this->_environment             . ' -i ' . $instance             . ' -j ' . $jobName;         if ($data) {             $cmd .= " '" . base64_encode(json_encode($data)) . "'";         }         $returnCode = $this->_passthru($cmd);         $this->_jobCount++;         $this->_debug('Job count: ' . $this->_jobCount);         return $returnCode;     }     /** * Check to see if the job limit has been reached * * @return bool */     protected function _keepRunning()     {         return ($this->_maxJobs === null) ? true                : ($this->_jobCount < $this->_maxJobs);     }     /** * Show debug messages * * @param mixed $message * * @return void */     protected function _debug($message)     {         if (!$this->_debug) {             return;         }         echo $message . "\n";     }     // @codeCoverageIgnoreStart     /** * Calls the passthru() function and returns the exit code. Abstracted * for testing. * * @param string $cmd The command to execute * * @return int */     protected function _passthru($cmd)     {         passthru($cmd, $returnCode);         return $returnCode;     }     /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */     protected function _getKestrelClient()     {         if (APPLICATION_ENV === 'testing') {             throw new Exception(__METHOD__ . ' was not mocked when testing');         }         return new EC_KestrelClient($this->_host, $this->_port);     }     // @codeCoverageIgnoreEnd } 

?

?

view raw EC_Consumer.php This Gist brought to you by GitHub.

Putting it together

Now that all the pieces are put together, let's take a look at in action. Adding example job "HelloWorld" to the queue "hello_world" from within our application looks something like this:

<?php $producer = new EC_Producer(); $producer->addJob('hello_world', 'HelloWorld', array('foo' => 'bar')); ?> view raw gistfile1.php This Gist brought to you by GitHub. 

?

?

And finally, here's an example of running the consumer from the CLI harness, along with some example debug output of processing the job:

./bin/consumer_cli.php -e development -s 127.0.0.1:22133 -q hello_world -d -m 2
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 1
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 2

view raw example.txt This Gist brought to you by GitHub.

That's it! I'd be interested to hear how other folks are interfacing with kestrel from PHP.

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
PHP和Python:解釋了不同的範例PHP和Python:解釋了不同的範例Apr 18, 2025 am 12:26 AM

PHP主要是過程式編程,但也支持面向對象編程(OOP);Python支持多種範式,包括OOP、函數式和過程式編程。 PHP適合web開發,Python適用於多種應用,如數據分析和機器學習。

PHP和Python:深入了解他們的歷史PHP和Python:深入了解他們的歷史Apr 18, 2025 am 12:25 AM

PHP起源於1994年,由RasmusLerdorf開發,最初用於跟踪網站訪問者,逐漸演變為服務器端腳本語言,廣泛應用於網頁開發。 Python由GuidovanRossum於1980年代末開發,1991年首次發布,強調代碼可讀性和簡潔性,適用於科學計算、數據分析等領域。

在PHP和Python之間進行選擇:指南在PHP和Python之間進行選擇:指南Apr 18, 2025 am 12:24 AM

PHP適合網頁開發和快速原型開發,Python適用於數據科學和機器學習。 1.PHP用於動態網頁開發,語法簡單,適合快速開發。 2.Python語法簡潔,適用於多領域,庫生態系統強大。

PHP和框架:現代化語言PHP和框架:現代化語言Apr 18, 2025 am 12:14 AM

PHP在現代化進程中仍然重要,因為它支持大量網站和應用,並通過框架適應開發需求。 1.PHP7提升了性能並引入了新功能。 2.現代框架如Laravel、Symfony和CodeIgniter簡化開發,提高代碼質量。 3.性能優化和最佳實踐進一步提升應用效率。

PHP的影響:網絡開發及以後PHP的影響:網絡開發及以後Apr 18, 2025 am 12:10 AM

PHPhassignificantlyimpactedwebdevelopmentandextendsbeyondit.1)ItpowersmajorplatformslikeWordPressandexcelsindatabaseinteractions.2)PHP'sadaptabilityallowsittoscaleforlargeapplicationsusingframeworkslikeLaravel.3)Beyondweb,PHPisusedincommand-linescrip

PHP類型提示如何起作用,包括標量類型,返回類型,聯合類型和無效類型?PHP類型提示如何起作用,包括標量類型,返回類型,聯合類型和無效類型?Apr 17, 2025 am 12:25 AM

PHP類型提示提升代碼質量和可讀性。 1)標量類型提示:自PHP7.0起,允許在函數參數中指定基本數據類型,如int、float等。 2)返回類型提示:確保函數返回值類型的一致性。 3)聯合類型提示:自PHP8.0起,允許在函數參數或返回值中指定多個類型。 4)可空類型提示:允許包含null值,處理可能返回空值的函數。

PHP如何處理對象克隆(克隆關鍵字)和__clone魔法方法?PHP如何處理對象克隆(克隆關鍵字)和__clone魔法方法?Apr 17, 2025 am 12:24 AM

PHP中使用clone關鍵字創建對象副本,並通過\_\_clone魔法方法定制克隆行為。 1.使用clone關鍵字進行淺拷貝,克隆對象的屬性但不克隆對象屬性內的對象。 2.通過\_\_clone方法可以深拷貝嵌套對象,避免淺拷貝問題。 3.注意避免克隆中的循環引用和性能問題,優化克隆操作以提高效率。

PHP與Python:用例和應用程序PHP與Python:用例和應用程序Apr 17, 2025 am 12:23 AM

PHP適用於Web開發和內容管理系統,Python適合數據科學、機器學習和自動化腳本。 1.PHP在構建快速、可擴展的網站和應用程序方面表現出色,常用於WordPress等CMS。 2.Python在數據科學和機器學習領域表現卓越,擁有豐富的庫如NumPy和TensorFlow。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前By尊渡假赌尊渡假赌尊渡假赌
威爾R.E.P.O.有交叉遊戲嗎?
1 個月前By尊渡假赌尊渡假赌尊渡假赌

熱工具

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

MantisBT

MantisBT

Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強大的PHP整合開發環境

Dreamweaver Mac版

Dreamweaver Mac版

視覺化網頁開發工具

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。