- /*
- * memcache キュークラス
- * 複数プロセスの同時書き込みと読み取りをサポート
- * 書き込み中の読み取り、AB 側ローテーション置換
- * @author guoyu
- * @create on 9:25 2014 -9-28
- * @qq テクノロジー産業交流グループ: 136112330
- *
- * @example:
- * $obj = new memcacheQueue('duilie')
- * $obj->add('1asdf'); $obj->getQueueLength();
- * $obj->read(11);
- */
- class memcacheQueue{
- public static $client;クライアント接続
- public $access; //キューを更新できるかどうか
- private $currentSide; //現在のローテーションのキューサイド: A/B
- private $lastSide; //前のローテーションのキューサイド: A/ B
- private $sideAHead; // サイド A
- の最初の値 private $sideATail; // サイド A
- の最初の値 private $sideBTail;サイド B の
- private $ currentHead; // チームの現在の先頭の値
- private $currentTail; // チームの現在の末尾の値
- private $; lastTail; // チームの最後のラウンドの最後の値
- private $expire // 有効期限、秒、つまり 30 日以内 0 は有効期限が切れないことを意味します
- private $sleepTime;ロック解除, マイクロ秒
- private $queueName; // キュー名、一意の値
- private $retryNum; // 再試行数 = 10 * 理論上の同時実行数
- const MAXNUM = 2000; // (片面) キューの最大数推奨される上限は 10K です
- const HEAD_KEY = '_lkkQueueHead_' // キューのヘッド
- const TAIL_KEY = '_lkkQueueTail_' // キューの末尾のキー
- const VALU_KEY = '_lkkQueueValu_' // キューの値のキー
- LOCK_KEY = ' _lkkQueueLock_'; // キューのロック キー
- const SIDE_KEY = '_lkkQueueSide_'; // ローテーション サイド キー
-
- /*
- * コンストラクター
- * @param [config] 配列 memcache サーバー パラメーター
- * @param [queueName] 文字列キュー名
- * @param [expire] 文字列の有効期限
- * @return NULL
- */
- public function __construct($queueName = '',$expire='',$config =''){
- if(empty($config)) {
- self::$client = memcache_pconnect('localhost',11211);
- }elseif(is_array($config )){//array('host'=>'127.0.0.1','port'=> '11211')
- self::$client = memcache_pconnect($config['host'],$config[' port']);
- }elseif(is_string($config)){//"127.0.0.1:11211"
- $tmp =explode(':',$config);
- $conf['host'] = isset( $tmp[0]) $tmp[0] : '127.0.0.1'; '] = isset($tmp[1]) ? $tmp[1] : '11211'; :$client = memcache_pconnect($conf['host'],$conf['port']);
- if(!self::$client) return false;
-
- ignore_user_abort(TRUE);//クライアント切断時 接続を開いて継続実行を許可
- set_time_limit(0);//スクリプト実行遅延の上限を解除
-
- $this->access = false;
- $this->sleepTime = 1000;
- $expire = (empty($expire) && $expire!=0) : (int)$expire; >expire = $expire;
- $this->queueName = $queueName;
-
- $side = memcache_add(self::$client, $queueName, ' A',false, $expire);
- $this->getHeadNTail($queueName);
- if(!isset($this ->sideAHead) || empty($this->sideAHead)) $this-> ;sideAHead = 0;
- if(!isset($this->sideATail) || empty($this->sideATail) ) $this->sideATail = 0;
- if(!isset($this-> SideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
- if(!isset( $this->sideBHead) || empty($this->sideBHead) ->sideBHead = 0;
-
- /*
- * 获取队列首尾值
- * @param [queueName] string 队列名
- * @return NULL
- */
- private function getHeadNTail($queueName){
- $this->sideAHead = (int)memcache_get( self::$client, $queueName.'A'.self::HEAD_KEY);
- $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
- $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'.self::HEAD_KEY);
- $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'.self::TAIL_KEY);
- }
-
- /*
- * 获取当前轮值の队列面
- * @return string 队列面名
- */
- public function getCurrentSide(){
- $currentSide = memcache_get(self::$client, $this->queueName .self::SIDE_KEY);
- if($currentSide == 'A'){
- $this->currentSide = 'A';
- $this->lastSide = 'B';
-
- $this->currentHead = $this->sideAHead;
- $this->currentTail = $this->sideATail;
- $this->lastHead = $this->sideBHead;
- $this->lastTail = $this->sideBTail;
- }else{
- $this->currentSide = 'B';
- $this->lastSide = 'A';
-
- $this->currentHead = $this->sideBHead;
- $this->currentTail = $this->sideBTail;
- $this->lastHead = $this->sideAHead;
- $this->lastTail = $this->sideATail;
- }
-
- return $this->currentSide;
- }
-
- /*
- * 队列加锁
- * @return boolean
- */
- private function getLock(){
- if($this->access === false){
- while(!memcache_add(self:: $client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
- usleep($this->sleepTime);
- @$i++;
- if($i > $this->retryNum){//尝试等待ちN次
- return false;
- 休憩;
- }
- }
- return $this->access = true;
- }
- false を返します。
- }
-
- /*
- * 队列解锁
- * @return NULL
- */
- private function unLock(){
- memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
- $this->access = false;
- }
-
- /*
- * 追加データ
- * @param [data] 要存储的值
- * @return boolean
- */
- public function add($data){
- $result = false;
- if(!$this->getLock()){
- return $result;
- }
- $this->getHeadNTail($this->queueName);
- $this->getCurrentSide();
-
- if($this->isFull()){
- $this->unLock();
- false を返します。
- }
-
- if($this->currentTail $value_key = $this->queueName .$this->currentSide 。 self::VALU_KEY 。 $this->currentTail;
- if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
- $this->changeTail();
- $result = true;
- }
- }else{//当前队列已满,更换轮值面
- $this->unLock();
- $this->changeCurrentSide();
- return $this->add($data);
- }
-
- $this->unLock();
- $result を返す;
- }
-
- /*
- * 取出データデータ
- * @param [length] int データの長さ
- * @return array
- */
- public function get($length=0){
- if(!is_numeric($length)) false を返します。
- if(empty($length)) $length = self::MAXNUM * 2;//默认读取すべて
- if(!$this->getLock()) return false;
-
- if($this->isEmpty()){
- $this->unLock();
- false を返します。
- }
-
- $keyArray = $this->getKeyArray($length);
- $lastKey = $keyArray['lastKey'];
- $currentKey = $keyArray['currentKey'];
- $keys = $keyArray['keys'];
- $this->changeHead($this->lastSide,$lastKey);
- $this->changeHead($this->currentSide,$currentKey);
-
- $data = @memcache_get(self::$client, $keys);
- foreach($keys as $v){//取出之後删除
- @memcache_delete(self::$client, $v, 0);
- }
- $this->unLock();
-
- $data を返します。
- }
-
- /*
- * 读取数据
- * @param [length] int データの長さ
- * @return array
- */
- public function read($length=0){
- if(!is_numeric($length) ) false を返します。
- if(empty($length)) $length = self::MAXNUM * 2;//默认读取すべて
- $keyArray = $this->getKeyArray($length);
- $data = @memcache_get(self::$client, $keyArray['keys']);
- $data を返します。
- }
-
- /*
- * 获取队列の特定の段数のキー数组
- * @param [length] int 队列长度
- * @return array
- */
- private function getKeyArray($length){
- $result = array('キー'=>array()、'lastKey'=>array()、'currentKey'=>array());
- $this->getHeadNTail($this->queueName);
- $this->getCurrentSide();
- if(empty($length)) $result を返します。
-
- //先取上一面のキー
- $i = $result['lastKey'] = 0;
- for($i=0;$i $result['lastKey'] = $this->lastHead + $i;
- if($result['lastKey'] >= $this->lastTail) Break;
- $result['keys'][] = $this->queueName .$this->lastSide 。 self::VALU_KEY 。 $result['lastKey'];
- }
-
- //再取得当前のキー
- $j = $length - $i;
- $k = $result['currentKey'] = 0;
- for($k=0;$k $result['currentKey'] = $this->currentHead + $k;
- if($result['currentKey'] >= $this->currentTail) Break;
- $result['keys'][] = $this->queueName .$this->currentSide 。 self::VALU_KEY 。 $result['currentKey'];
- }
-
- $result を返します。
- }
-
- /*
- * 当前轮值面队列尾の值
- * @return NULL
- */
- private function changeTail(){
- $tail_key = $this->queueName .$this->currentSide 。自分::TAIL_KEY;
- memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
- //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
- $v = memcache_get(self::$client, $tail_key) +1;
- memcache_set(self::$client, $tail_key,$v,false,$this->expire);
- }
-
- /*
- * 更新队列首の值
- * @param [side] string 要更新的面
- * @param [headValue] int 队列首の值
- * @return NULL
- */
- private function changeHead( $side,$headValue){
- if($headValue $head_key = $this->queueName .$side 。自分::HEAD_KEY;
- $tail_key = $this->queueName .$side 。自分::TAIL_KEY;
- $sideTail = memcache_get(self::$client, $tail_key);
- if($headValue memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
- }elseif($headValue >= $sideTail){
- $this->resetSide($side);
- }
- }
-
- /*
- * 重置队列面、即将该队列面の队首、队尾值置は0
- * @param [side] string 要重置面
- * @return NULL
- */
- プライベート関数resetSide($side){
- $head_key = $this->queueName .$side .自分::HEAD_KEY;
- $tail_key = $this->queueName .$side 。自分自身::TAIL_KEY;
- memcache_set(self::$client, $head_key,0,false,$this->expire);
- memcache_set(self::$client, $tail_key,0,false,$this->expire);
- }
-
- /*
- * 改变当前轮值队列面
- * @return string
- */
- private function changeCurrentSide(){
- $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY) ;
- if($currentSide == 'A'){
- memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
- $this->currentSide = 'B';
- }else{
- memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
- $this->currentSide = 'A';
- }
- return $this->currentSide;
- }
-
- /*
- * 检查当前队列否否已满
- * @return boolean
- */
- public function isFull(){
- $result = false;
- if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
- $result = true;
- }
- $result を返します。
- }
-
- /*
- * 检查当前队列否か空
- * @return boolean
- */
- public function isEmpty(){
- $result = true;
- if($this->sideATail > 0 || $this->sideBTail > 0){
- $result = false;
- }
- $result を返します。
- }
-
- /*
- * 取当前队列の長さ
- * この長さは理论長さです、特定の要素は期限切れによる損失、真实長さはこの長さ以下です
- * @return int
- */
- パブリック関数 getQueueLength( ){
- $this->getHeadNTail($this->queueName);
- $this->getCurrentSide();
-
- $sideALength = $this->sideATail - $this->sideAHead;
- $sideBLength = $this->sideBTail - $this->sideBHead;
- $result = $sideALength + $sideBLength;
-
- $result を返す;
- }
-
- /*
- * 清空当前队列データ、仅保持HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
- * @return boolean
- */
- public function clear(){
- if(!$this->getLock() ) false を返します。
- for($i=0;$i<:maxnum> @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0 );
- @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
- }
- $this->unLock();
- $this->resetSide('A');
- $this->resetSide('B');
- true を返します。
- }
-
- /*
- * 清除すべてmemcache缓存数据
- * @return NULL
- */
- public function memFlush(){
- memcache_flush(self::$client);
- }
-
- }
复制代
|