/** * 共有メモリを使用した PHP 循環メモリ キューの実装 * 複数のプロセスをサポートし、さまざまなデータ型のストレージをサポート * 注: エンキューまたはデキュー操作が完了したら、できるだけ早く unset() を使用してクリティカル セクションを解放します * * @author wangbinandi@gmail.com * @作成 2009-12-23 */ class ShmQueue { private $ maxQSize = 0 ; // キューの最大長 private $front = 0; // キューの先頭ポインタ
private $blockSize = 256; private $ memSize = 25600; // 最大共有メモリ (バイト)private $shmId = 0;
private $filePtr = './shmq.ptr'; private $semId = 0; public function __construct() { $shmkey = ftok(__FILE__, 't');
$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize ); $this->maxQSize = $this ->memSize / $this->blockSize; // セマフォを適用しますか?$this->semId = sem_get($shmkey, 1); sem_acquire($this->semId); / クリティカルセクションに入るために適用します $this->init(); }
プライベート関数 init() { if ( file_exists($this->filePtr) ){$contents = file_get_contents($ this-> ;filePtr); $data =explode( '|', $contents ); if ( isset($data[0]) && isset($data[1])){ $this->front = (int )$data[0]; $this->rear = (int)$data[1]; } } }
public function getLength() { return (($this-> ;rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;}
public function enQueue( $value ) { if ( $this ->ptrInc($this->rear) == $this->front ){ // キューがいっぱいreturn false; }
$data = $this->encode($value) ; shmop_write ($this->shmId, $data, $this->rear ); $this->rear = $this->ptrInc($this->rear);return true; }
public function deQueue() { if ( $this->front == $this->rear ){ // キューが空ですreturn false; } $value = shmop_read($this-> shmId, $this->front, $this->blockSize-1); $this->front = $this->ptrInc($this->front); return $this->decode ($value ); }
プライベート関数 ptrInc( $ptr ) { return ($ptr + $this->blockSize) % ($this->memSize);}
プライベート関数 encode( $値) { $data = Serialize($value) . "__eof";echo '';
echo strlen($data); echo '';
echo $this->blockSize -1; echo '' ;
if ( strlen($data) > $this->blockSize -1 ){ throw new Exception(strlen($data)." はオーバーロード ブロック サイズです!"); }return $ data; }
プライベート関数 decode( $value ) { $data =explode("__eof", $value);return unserialize($data[0]); }
パブリック関数 __destruct() { $ data = $this->フロント . $this->rear;file_put_contents($this->filePtr, $data);
sem_release($this->semId); / クリティカルエリア外でセマフォを解放 } } /* // キュー操作 $shmq = new ShmQueue();$data = 'テストデータ'; $shmq->enQueue($data ); unset($shmq); // デキュー操作 $shmq = new ShmQueue(); $data = $shmq->deQueue(); unset($shmq); */ ?>>
メッセージキューが大きい場合、大規模なデータベースのシリアル化と逆シリアル化を頻繁に行うのは時間がかかりすぎます。以下は、PHP を使用して実装したメッセージ キューです。末尾にデータを挿入し、末尾を操作するだけで済みます。読み取りや操作のためにメッセージ キュー全体を操作する必要はありません。ただし、このメッセージ キューはスレッドセーフではありません。競合の可能性を回避しようとしているだけです。メッセージの密度がそれほど高くない場合 (たとえば、数秒に 1 つのメッセージだけである場合)、この方法での使用を検討できます。 スレッドの安全性を実現したい場合、ファイルをロックしてから操作することをお勧めします。以下はコードです: コードは次のとおりです:
代码如下 | 复制代码 | class Memcache_Queue { private $memcache; プライベート $name; プライベート $プレフィックス; function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") { if ($memcache == null) { throw new Exception("memcache オブジェクトが null、最初にオブジェクトを新規作成します。") ; } $this->memcache = $memcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->サイズ = 0; } function __get($name) { return $this->get($name); } function __set($name, $value) { $this->add($name, $value); $this を返す; } 関数 isEmpty() { return $this->size == 0; } 関数 isFull() { return $this->size == $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); $this を返す; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement("サイズ"); $this->delete($this->front); $this->set("フロント", ($this->フロント + 1) % $this->maxSize); $this を返す; } 関数 getTop() { return $this->get($this->front); } 関数 getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { if ($this->isEmpty() || $this->size < $offset) { return null; } $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this ->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } 関数 makeEmpty() { $keys = $this->getAllKeys(); foreach ($keys as $value) { $this->delete($value); } $this->delete("real"); $this->delete("フロント"); $this->delete("サイズ"); $this->delete("maxSize"); } プライベート関数 getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this-> maxSize) { $keys[] = $this->getKeyByPos($pos); } $keys を返します。 } プライベート関数 add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos), $data); $this を返す; } プライベート関数 increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } プライベート関数 decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } プライベート関数 set($pos, $data) { $this->memcache->set($this->getKeyByPos($pos), $data); $this を返す; } プライベート関数 get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } プライベート関数 delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } プライベート関数 getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } }
|
|