ホームページ >バックエンド開発 >PHPチュートリアル >PHP memcache がメッセージキューを実装する例
PHP の memcache によるメッセージ キューの実装例
memcache はサーバー キャッシュで広く使用されています。知る必要がある人は参考にしてください。
memche メッセージ キューの原理は、キーについて大騒ぎすることです。キーは、シリアル化後にメッセージまたはログを記録するために連続した番号とプレフィックスを作成するために使用されます。その後、スケジュールされたプログラムを通じてコンテンツがファイルまたはデータベースにドロップされます。
php を使用したメッセージキューの実装 たとえば、メールを送信するときに大量のメールを送信するのに時間がかかる場合は、キューを使用できます。
キューの実装を容易にする軽量のキュー サーバーは次のとおりです。
starling は、memcache プロトコルをサポートする軽量の永続サーバーです。
https://github.com/starling/starling
Beanstalkd は軽量で効率的です。永続性をサポートし、1 秒あたり約 3,000 のキューを処理できます。
http://kr.github.com/beanstalkd/
Memcache/memcached は、php でメッセージ キューを実装するために使用することもできます。
<?php/*** Memcache 消息队列类*/class QMC {const PREFIX = 'ASDFASDFFWQKE';/*** 初始化mc* @staticvar string $mc* @return Memcache*/static private function mc_init() {static $mc = null;if (is_null($mc)) {$mc = new Memcache;$mc->connect('127.0.0.1', 11211);}return $mc;}/*** mc 计数器,增加计数并返回新的计数* @param string $key 计数器* @param int $offset 计数增量,可为负数.0为不改变计数* @param int $time 时间* @return int/false 失败是返回false,成功时返回更新计数器后的计数*/static public function set_counter( $key, $offset, $time=0 ){$mc = self::mc_init();$val = $mc->get($key);if( !is_numeric($val) || $val < 0 ){$ret = $mc->set( $key, 0, $time );if( !$ret ) return false;$val = 0;}$offset = intval( $offset );if( $offset > 0 ){return $mc->increment( $key, $offset );}elseif( $offset < 0 ){return $mc->decrement( $key, -$offset );}return $val;}/*** 写入队列* @param string $key* @param mixed $value* @return bool*/static public function input( $key, $value ){$mc = self::mc_init();$w_key = self::PREFIX.$key.'W';$v_key = self::PREFIX.$key.self::set_counter($w_key, 1);return $mc->set( $v_key, $value );}/*** 读取队列里的数据* @param string $key* @param int $max 最多读取条数* @return array*/static public function output( $key, $max=100 ){$out = array();$mc = self::mc_init();$r_key = self::PREFIX.$key.'R';$w_key = self::PREFIX.$key.'W';$r_p = self::set_counter( $r_key, 0 );//读指针$w_p = self::set_counter( $w_key, 0 );//写指针if( $r_p == 0 ) $r_p = 1;while( $w_p >= $r_p ){if( --$max < 0 ) break;$v_key = self::PREFIX.$key.$r_p;$r_p = self::set_counter( $r_key, 1 );$out[] = $mc->get( $v_key );$mc->delete($v_key);}return $out;}}/**使用方法:QMC::input($key, $value );//写入队列$list = QMC::output($key);//读取队列*/?>
PHP 共有メモリに基づいて実装されたメッセージ キュー:
<?php/*** 使用共享内存的PHP循环内存队列实现* 支持多进程, 支持各种数据类型的存储* 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区** @author [email protected]* @created 2009-12-23*/class ShmQueue{private $maxQSize = 0; // 队列最大长度private $front = 0; // 队头指针private $rear = 0; // 队尾指针private $blockSize = 256; // 块的大小(byte)private $memSize = 25600; // 最大共享内存(byte)private $shmId = 0;private $filePtr = './shmq.ptr';private $semId = 0;public function __construct(){$shmkey = ftok(__FILE__, 't');$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );$this->maxQSize = $this->memSize / $this->blockSize;// 申?一个信号量$this->semId = sem_get($shmkey, 1);sem_acquire($this->semId); // 申请进入临界区$this->init();}private function init(){if ( file_exists($this->filePtr) ){$contents = file_get_contents($this->filePtr);$data = explode( '|', $contents );if ( isset($data[0]) && isset($data[1])){$this->front = (int)$data[0];$this->rear = (int)$data[1];}}}public function getLength(){return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;}public function enQueue( $value ){if ( $this->ptrInc($this->rear) == $this->front ){ // 队满return false;}$data = $this->encode($value);shmop_write($this->shmId, $data, $this->rear );$this->rear = $this->ptrInc($this->rear);return true;}public function deQueue(){if ( $this->front == $this->rear ){ // 队空return false;}$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);$this->front = $this->ptrInc($this->front);return $this->decode($value);}private function ptrInc( $ptr ){return ($ptr + $this->blockSize) % ($this->memSize);}private function encode( $value ){$data = serialize($value) . "__eof";echo '';echo strlen($data);echo '';echo $this->blockSize -1;echo '';if ( strlen($data) > $this->blockSize -1 ){throw new Exception(strlen($data)." is overload block size!");}return $data;}private function decode( $value ){$data = explode("__eof", $value);return unserialize($data[0]);}public function __destruct(){$data = $this->front . '|' . $this->rear;file_put_contents($this->filePtr, $data);sem_release($this->semId); // 出临界区, 释放信号量}}/*// 进队操作$shmq = new ShmQueue();$data = 'test data';$shmq->enQueue($data);unset($shmq);// 出队操作$shmq = new ShmQueue();$data = $shmq->deQueue();unset($shmq);*/?>
メッセージ キューが大きい場合、大規模なデータベースを頻繁にシリアル化および逆シリアル化すると、時間がかかりすぎます。以下は、PHP を使用して実装したメッセージ キューです。末尾にデータを挿入し、末尾を操作するだけで済みます。読み取りや操作のためにメッセージ キュー全体を操作する必要はありません。ただし、このメッセージ キューはスレッドセーフではありません。競合の可能性を回避しようとしているだけです。メッセージの密度がそれほど高くない場合 (たとえば、数秒に 1 つのメッセージだけである場合)、この方法での使用を検討できます。
スレッドの安全性を実現したい場合、ファイルをロックしてから操作することをお勧めします。コードは次のとおりです:
コードは次のとおりです:
class Memcache_Queue { private $memcache; private $name; private $prefix; function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") { if ($memcache == null) { throw new Exception("memcache object is null, new the object first."); } $this->memcache = $memcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->size = 0; } function __get($name) { return $this->get($name); } function __set($name, $value) { $this->add($name, $value); return $this; } function isEmpty() { return $this->size == 0; } function isFull() { return $this->size == $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); return $this; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement("size"); $this->delete($this->front); $this->set("front", ($this->front + 1) % $this->maxSize); return $this; } function getTop() { return $this->get($this->front); } function getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { if ($this->isEmpty() || $this->size < $offset) { return null; } $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } function makeEmpty() { $keys = $this->getAllKeys(); foreach ($keys as $value) { $this->delete($value); } $this->delete("real"); $this->delete("front"); $this->delete("size"); $this->delete("maxSize"); } private function getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); } return $keys; } private function add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos), $data); return $this; } private function increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } private function decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } private function set($pos, $data) { $this->memcache->set($this->getKeyByPos($pos), $data); return $this; } private function get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } private function delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } private function getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } }