search
HomeBackend DevelopmentPHP Tutorial转载 kestrel php 讯息队列

转载 kestrel php 消息队列

We've been using Twitter's kestrel queue server for a while now at work, but only from our service layer, which is written in python.? Now that we have some queueing needs from our application layer, written in PHP, I spent a few days this week adding queue support to our web application.? I thought I'd share what I learned, and how I implemented it.

Goals

The kestrel server itself was pretty straightforward to get up and running.? The only thing I would point out is that I recommend sticking to release branches, as master was fairly unstable when I tried to use it.? Regarding implementing the client, there were a few goals I had in mind when I started:

??? Since kestrel is built on the memcache protocol, try and leverage an existing memcache client rather than build one from scratch
??? Utilize our existing batch job infrastructure, which I covered previously here, and make sure our multi-tenant needs are met
??? Keep the queue interface generic in case we change queue servers later
??? Utilize existing kestrel management tools, only build out the the functionality we need

With these goals in mind, I ended up with 4 components: a kestrel client, a producer, a consumer, and a very small CLI harness for running the consumer.? But before I even coded anything, I set up kestrel web, a web UI for kestrel written by my co-worker Matt Erkkila.? Kestrel web allows you to view statistics on kestrel, manage queues, as well as sort and filter queues based on manual inputs.? Having this tool up and running from the get go made it easy to watch jobs get added and consumed from my test queue, and also easily flush out the queues as needed.

The Kestrel Client

I couldn't find any existing kestrel clients for PHP, so I started looking at the two memcache extensions: the older memcache, and Andrei Zmievski's memcached, the latter of which is based on the libmemcached library.? I started with memcache, and while it worked fine initially, I quickly found that I could not modify timeouts.? This interfered with the way? kestrel recommends you poll it for new jobs, and I would see timeout errors from the memcache extension if you tried to set the poll timeout to 1 second or higher (the memcache default).? The memcached extension does not have these issues, so I went with it.

The first gotcha I ran into was serialization.? You can use memcached's serializer for writing to kestrel, but when it reads the data back, it doesn't recognize that it is serialized.? So I just serialize the data manually in my client, and things work fine. One other thing to note is that you'll want to disable compression, or do it manually, as the memcached extension will automatically compress anything over 100 bytes by default, and will not decompress it when reading from kestrel.

The other issue is that if you want to use any custom kestrel commands, you can't.? Since the application layer doesn't need anything fancy, the memcached extension will work fine for it.? Once we need support for the upcoming monitor (batching) in kestrel 2, we may need to implement a kestrel client from scratch.? Kestrel web supplies everything else we need right now.

Once the decision was made to use memcached, I wrote a light decorator for it, EC_KestrelClient.? This handles instantiation of the memcached client, serialization, and helpers for some kestrel specific options to the GET command.? It also has support for passing memcached specific options through it.? The class ended up looking like this:

?

<?php /** * A thin kestrel client that wraps Memcached (libmemcached extension) * * @author Bill Shupp <[email&#160;protected]> * @copyright 2010-2011 Empower Campaigns */ class EC_KestrelClient {     /** * The Memcached instance * * @var Memcached */     protected $_memcached = null;     /** * The Kestrel server IP * * @var string */     protected $_host = '127.0.0.1';     /** * The Kestrel server port * * @var string */     protected $_port = 22133;     /** * Optional options, not currently used * * @var array */     protected $_options = array();     /** * Sets the host, port, and options to be used * * @param string $host The host to use, defaults to 127.0.0.1 * @param int $port The port to use, defaults to 22133 * @param array $options Memcached options, not currently used * * @return void */     public function __construct(         $host = '127.0.0.1', $port = 22133, array $options = array()     )     {         $this->_host = $host;         $this->_port = $port;         $this->setOptions($options);     }     /** * Sets job data on the queue, json_encoding the value to avoid problematic * serialization. * * @param string $queue The queue name * @param mixed $data The data to store * * @return bool */     public function set($queue, $data)     {         // Local json serialization, as kestrel doesn't send serialization flags         return $this->getMemcached()->set($queue, json_encode($data));     }     /** * Reliably read an item off of the queue. Meant to be run in a loop, and * call closeReliableRead() when done to make sure the final job is not left * on the queue. * * @param mixed $queue The queue name to read from * @param int $timeout The timeout to wait for a job to appear * * @return array|false * @see closeReliableRead() */     public function reliableRead($queue, $timeout = 1000)     {         $queue = $queue . '/close/open/t=' . $timeout;         $result = $this->getMemcached()->get($queue);         if ($result === false) {             return $result;         }         // Local json serialization, as kestrel doesn't send serialization flags         return json_decode($result, true);     }     /** * Closes any existing open read * * @param string $queue The queue name * * @return false */     public function closeReliableRead($queue)     {         $queue = $queue . '/close';         return $this->getMemcached()->get($queue);     }     /** * Aborts an existing reliable read * * @param string $queue The queue name * * @return false */     public function abortReliableRead($queue)     {         $queue = $queue . '/abort';         return $this->getMemcached()->get($queue);     }     /** * Set an option to be used with the Memcached client. Not used. * * @param string $name The option name * @param value $value The option value * * @return void */     public function setOption($name, $value)     {         $this->_options[$name] = $value;     }     /** * Sets multiple options * * @param array $options Array of key/values to set * * @return void */     public function setOptions(array $options)     {         foreach ($options as $name => $value) {             $this->setOption($name, $value);         }     }     /** * Gets a current option's value * * @param string $name The option name * * @return mixed */     public function getOption($name)     {         if (isset($this->_options[$name])) {             return $this->_options[$name];         }         return null;     }     /** * Gets all current options * * @return array */     public function getOptions()     {         return $this->_options;     }     /** * Gets a singleton instance of the Memcached client * * @return Memcached */     public function getMemcached()     {         if ($this->_memcached === null) {             $this->_initMemcached();         }         return $this->_memcached;     }     /** * Initialized the Memcached client instance * * @return void */     protected function _initMemcached()     {         $this->_memcached = $this->_getMemcachedInstance();         foreach ($this->_options as $option => $value) {             $this->_memcached->setOption($option, $value);         }         $this->_memcached->addServer($this->_host, $this->_port);         $this->_memcached->setOption(Memcached::OPT_COMPRESSION, false);     }     // @codeCoverageIgnoreStart     /** * Returns a new instance of Memcached. Abstracted for testing. * * @return Memcached */     protected function _getMemcachedInstance()     {         return new Memcached();     }     // @codeCoverageIgnoreEnd } 

?

?

view raw EC_KestrelClient.php This Gist brought to you by GitHub.

The Producer

The producer is very simple.? It just formats the data into a standard structure, including current tenant information, namespaces the queue so it doesn't collide with other projects, and adds it to the queue.? The producer looks like this:

?

<?php /** * Interface for adding jobs to a queue server * * @author Bill Shupp <[email&#160;protected]> * @copyright 2010-2011 Empower Campaigns */ class EC_Producer {     /** * Adds a job onto a queue * * @param string $queue The queue name to add a job to * @param string $jobName The job name for the consumer to run * @param mixed $data Optional additional data to pass to the job * * @return bool */     public function addJob($queue, $jobName, $data = null)     {         $item = array(             'instance' => EC::getCurrentInstanceName(),             'jobName' => $jobName         );         if ($data !== null) {             $item['data'] = $data;         }         // Namespace queue with project         $queue = 'enterprise_' . $queue;         $client = $this->_getKestrelClient();         return $client->set($queue, $item);     }     // @codeCoverageIgnoreStart     /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */     protected function _getKestrelClient()     {         if (APPLICATION_ENV === 'testing') {             throw new Exception(__METHOD__ . ' was not mocked when testing');         }         static $client = null;         if ($client === null) {             $host = EC::getConfigOption('kestrel.host');             $port = EC::getConfigOption('kestrel.port');             $client = new EC_KestrelClient($host, $port);         }         return $client;     }     // @codeCoverageIgnoreEnd } 

?

?

?

view raw EC_Producer.php This Gist brought to you by GitHub.

The Consumer

The consumer has a bit more to it, though still pretty straightforward.? It's intended to be run from a monitoring tool like daemontools or supervisord, so there is a very small CLI harness that just passes the CLI arguments into EC_Consumer and runs it.? After parsing the CLI arguments, EC_Consumer polls kestrel for new jobs, and runs them through our standard batch job infrastructure.? Until we have more confidence in PHP's long running process ability, I added an optional maxium jobs argument, which will stop the consumer from processing more than X jobs and then terminate.? The monitoring service (supervisord) will then just restart it in a matter of seconds.? I also added an optional debug argument for testing, so you can see every action as it happens.? The CLI harness looks like this:

?

#!/bin/env php <?php // External application bootstrapping require_once __DIR__ . '/cli_init.php'; // Instantiate and run the consumer $consumer = new EC_Consumer($argv); $consumer->run(); 

?

view raw consumer_cli.php This Gist brought to you by GitHub.

And the main consumer class, EC_Consumer, looks something like this:

<?php /** * Enterprise queue consumer interface, called by bin/consumer_cli.php * * @author Bill Shupp <[email&#160;protected]> * @copyright 2010-2011 Empower Campaigns */ class EC_Consumer {     /** * Instance of [email&#160;protected] Zend_Console_Getopt} * * @var Zend_Console_Getopt */     protected $_opt = null;     /** * Which APPLICATION_ENV to run under (see -e) * * @var string */     protected $_environment = null;     /** * The kestrel server IP * * @var string */     protected $_host = null;     /** * The kestrel server port * * @var int */     protected $_port = null;     /** * The kestrel queue name to connect to * * @var string */     protected $_queue = null;     /** * Whether we should show debug output * * @var bool */     protected $_debug = false;     /** * Maximum # of jobs for this process to perform (for memory fail safe) * * @var int */     protected $_maxJobs = null;     /** * Current job count * * @var int */     protected $_jobCount = 0;     /** * Parses arguments from the command line and does error handling * * @param array $argv The $argv from bin/ecli.php * * @throw Zend_Console_Getopt_Exception on failure * @return void */     public function __construct(array $argv)     {         try {             $opt = new Zend_Console_Getopt(                 array(                     'environment|e=s' => 'environment name (e.g. development)'                                          . ', required',                     'server|s=s' => 'kestrel server, format of host:port'                                          . ', required',                     'queue|q=s' => 'queue name (e.g. crawler_campaign)'                                          . ', required',                     'max-jobs|m=s' => 'max jobs to run before exiting'                                          . ', optional',                     'debug|d' => 'show debug output'                                          . ', optional',                 )             );             $opt->setArguments($argv);             $opt->parse();             // Set environment             if ($opt->e === null) {                 throw new Zend_Console_Getopt_Exception(                     'Error: missing environment'                 );             }             $this->_environment = $opt->e;             // @codeCoverageIgnoreStart             if (!defined('APPLICATION_ENV')) {                 define('APPLICATION_ENV', $this->_environment);             }             // @codeCoverageIgnoreEnd             // Set server             if ($opt->s === null) {                 throw new Zend_Console_Getopt_Exception(                     'Error: missing server'                 );             }             $parts = explode(':', $opt->s);             if (count($parts) !== 2) {                 throw new Zend_Console_Getopt_Exception(                     'Error: invalid server: ' . $opt->s                 );             }             $this->_host = $parts[0];             $this->_port = $parts[1];             // Set queue             if ($opt->q === null) {                 throw new Zend_Console_Getopt_Exception(                     'Error: missing queue'                 );             }             $this->_queue = $opt->q;             // Set max-jobs             if ($opt->m !== null) {                 $this->_maxJobs = $opt->m;             }             // Set debug             if ($opt->d !== null) {                 $this->_debug = true;             }         } catch (Zend_Console_Getopt_Exception $e) {             echo "\n" . $e->getMessage() . "\n\n";             echo $opt->getUsageMessage();             // @codeCoverageIgnoreStart             if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') {                 exit(1);             }             // @codeCoverageIgnoreEnd         }         $this->_opt = $opt;     }     /** * Polls the queue server for jobs and runs them as they come in * * @return void */     public function run()     {         $client = $this->_getKestrelClient();         $queue = 'enterprise_' . $this->_queue;         while ($this->_keepRunning()) {             // Pull job from queue             $job = $client->reliableRead($queue, 500);             if ($job === false) {                 $this->_debug('Nothing on queue ' . $queue);                 continue;             }             if (!isset($job['instance'])) {                 echo 'Instance not set in queue job: ' . print_r($job, true);                 continue;             }             $instance = $job['instance'];             if (!isset($job['jobName'])) {                 echo 'Job name not set in queue job: ' . print_r($job, true);                 continue;             }             $jobName = $job['jobName'];             $data = null;             if (isset($job['data'])) {                 $data = $job['data'];             }             // Run the job             $returnCode = $this->runJob($instance, $jobName, $data);             if ($returnCode !== 0) {                 $client->abortReliableRead($queue);                 continue;             }         }         $client->closeReliableRead($queue);     }     /** * Runs the job via bin/ecli.php * * @param string $instance The instance name to run the job under * @param string $jobName The job name * @param string $data Optional extra data * * @return int */     public function runJob($instance, $jobName, $data)     {         $cmd = BASE_PATH . '/bin/ecli.php '             . '-e ' . $this->_environment             . ' -i ' . $instance             . ' -j ' . $jobName;         if ($data) {             $cmd .= " '" . base64_encode(json_encode($data)) . "'";         }         $returnCode = $this->_passthru($cmd);         $this->_jobCount++;         $this->_debug('Job count: ' . $this->_jobCount);         return $returnCode;     }     /** * Check to see if the job limit has been reached * * @return bool */     protected function _keepRunning()     {         return ($this->_maxJobs === null) ? true                : ($this->_jobCount < $this->_maxJobs);     }     /** * Show debug messages * * @param mixed $message * * @return void */     protected function _debug($message)     {         if (!$this->_debug) {             return;         }         echo $message . "\n";     }     // @codeCoverageIgnoreStart     /** * Calls the passthru() function and returns the exit code. Abstracted * for testing. * * @param string $cmd The command to execute * * @return int */     protected function _passthru($cmd)     {         passthru($cmd, $returnCode);         return $returnCode;     }     /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */     protected function _getKestrelClient()     {         if (APPLICATION_ENV === 'testing') {             throw new Exception(__METHOD__ . ' was not mocked when testing');         }         return new EC_KestrelClient($this->_host, $this->_port);     }     // @codeCoverageIgnoreEnd } 

?

?

view raw EC_Consumer.php This Gist brought to you by GitHub.

Putting it together

Now that all the pieces are put together, let's take a look at in action. Adding example job "HelloWorld" to the queue "hello_world" from within our application looks something like this:

<?php $producer = new EC_Producer(); $producer->addJob('hello_world', 'HelloWorld', array('foo' => 'bar')); ?> view raw gistfile1.php This Gist brought to you by GitHub. 

?

?

And finally, here's an example of running the consumer from the CLI harness, along with some example debug output of processing the job:

./bin/consumer_cli.php -e development -s 127.0.0.1:22133 -q hello_world -d -m 2
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 1
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
??? [foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 2

view raw example.txt This Gist brought to you by GitHub.

That's it! I'd be interested to hear how other folks are interfacing with kestrel from PHP.

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Laravel开发:如何使用Laravel Queue处理异步任务?Laravel开发:如何使用Laravel Queue处理异步任务?Jun 13, 2023 pm 08:32 PM

随着应用程序变得越来越复杂,处理和管理大量数据和流程是一个挑战。为了处理这种情况,Laravel为用户提供了一个非常强大的工具,即Laravel队列(Queue)。它允许开发人员在后台运行诸如发送电子邮件,生成PDF,处理图像剪裁等任务,而不会对用户界面产生任何影响。在这篇文章中,我们将深入研究如何使用Laravel队列。什么是LaravelQueue队列

C语言return的用法详解C语言return的用法详解Oct 07, 2023 am 10:58 AM

C语言return的用法有:1、对于返回值类型为void的函数,可以使用return语句来提前结束函数的执行;2、对于返回值类型不为void的函数,return语句的作用是将函数的执行结果返回给调用者;3、提前结束函数的执行,在函数内部,我们可以使用return语句来提前结束函数的执行,即使函数并没有返回值。

2 个月不见,人形机器人 Walker S 会叠衣服了2 个月不见,人形机器人 Walker S 会叠衣服了Apr 03, 2024 am 08:01 AM

机器之能报道编辑:吴昕国内版的人形机器人+大模型组队,首次完成叠衣服这类复杂柔性材料的操作任务。随着融合了OpenAI多模态大模型的Figure01揭开神秘面纱,国内同行的相关进展一直备受关注。就在昨天,国内"人形机器人第一股"优必选发布了人形机器人WalkerS深入融合百度文心大模型后的首个Demo,展示了一些有趣的新功能。现在,得到百度文心大模型能力加持的WalkerS是这个样子的。和Figure01一样,WalkerS没有走动,而是站在桌子后面完成一系列任务。它可以听从人类的命令,折叠衣物

Java中return和finally语句的执行顺序是怎样的?Java中return和finally语句的执行顺序是怎样的?Apr 25, 2023 pm 07:55 PM

源码:publicclassReturnFinallyDemo{publicstaticvoidmain(String[]args){System.out.println(case1());}publicstaticintcase1(){intx;try{x=1;returnx;}finally{x=3;}}}#输出上述代码的输出可以简单地得出结论:return在finally之前执行,我们来看下字节码层面上发生了什么事情。下面截取case1方法的部分字节码,并且对照源码,将每个指令的含义注释在

多线程环境下Java Queue队列的安全性问题及解决方案多线程环境下Java Queue队列的安全性问题及解决方案Jan 13, 2024 pm 03:04 PM

JavaQueue队列在多线程环境下的安全性问题与解决方案引言:在多线程编程中,程序中的共享资源可能面临竞争条件,这可能导致数据的不一致性或者错误。在Java中,Queue队列是一种常用的数据结构,在多个线程同时操作队列的情况下,就存在安全性问题。本文将讨论JavaQueue队列在多线程环境下的安全性问题,并介绍几种解决方案,重点以代码示例的方式解释。一

Queue在Java中的应用Queue在Java中的应用Feb 18, 2024 pm 03:52 PM

Java中Queue的用法在Java中,Queue(队列)是一种常用的数据结构,它遵循先进先出(FIFO)原则。Queue可用于实现消息队列、任务调度等场景,能够很好地管理数据的排列和处理顺序。本文将介绍Queue的用法,并提供具体的代码示例。Queue的定义和常用方法在Java中,Queue是JavaCollectionsFramework中的一个接口

Vue3怎么使用setup语法糖拒绝写returnVue3怎么使用setup语法糖拒绝写returnMay 12, 2023 pm 06:34 PM

Vue3.2setup语法糖是在单文件组件(SFC)中使用组合式API的编译时语法糖解决Vue3.0中setup需要繁琐将声明的变量、函数以及import引入的内容通过return向外暴露,才能在使用的问题1.在使用中无需return声明的变量、函数以及import引入的内容,即可在使用语法糖//import引入的内容import{getToday}from&#39;./utils&#39;//变量constmsg=&#39;Hello!&#39;//函数func

详解JavaScript函数返回值和return语句详解JavaScript函数返回值和return语句Aug 04, 2022 am 09:46 AM

JavaScript 函数提供两个接口实现与外界的交互,其中参数作为入口,接收外界信息;返回值作为出口,把运算结果反馈给外界。下面本篇文章带大家了解一下JavaScript函数返回值,浅析下return语句的用法,希望对大家有所帮助!

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
Repo: How To Revive Teammates
1 months agoBy尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island Adventure: How To Get Giant Seeds
4 weeks agoBy尊渡假赌尊渡假赌尊渡假赌

Hot Tools

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Integrate Eclipse with SAP NetWeaver application server.

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.

VSCode Windows 64-bit Download

VSCode Windows 64-bit Download

A free and powerful IDE editor launched by Microsoft

MantisBT

MantisBT

Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

mPDF

mPDF

mPDF is a PHP library that can generate PDF files from UTF-8 encoded HTML. The original author, Ian Back, wrote mPDF to output PDF files "on the fly" from his website and handle different languages. It is slower than original scripts like HTML2FPDF and produces larger files when using Unicode fonts, but supports CSS styles etc. and has a lot of enhancements. Supports almost all languages, including RTL (Arabic and Hebrew) and CJK (Chinese, Japanese and Korean). Supports nested block-level elements (such as P, DIV),