memcacheQueue.class.php
const MAXNUM = 20000; //最大队列数,建议上限10K
const HEAD_KEY = '_lkkQueueHead_'; //队列首kye
const TAIL_KEY = '_lkkQueueTail_'; //队列尾key
const VALU_KEY = '_lkkQueueValu_'; //队列值key
const LOCK_KEY = '_lkkQueueLock_'; //队列锁key
/**
*コンストラクター
* @param String $ queuename queue name*/
public function __construct($queueName ='',$expire=0,$config =''){
if(empty($config)){
self::$client = memcache_pconnect ('127.0.0.1',11211);
}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
self: :$client = memcache_pconnect($config['host'],$config['port']);
}elseif(is_string($config)){//"127.0.0.1:11211"
$tmp =explode(' :',$config);
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
self::$client = memcache_pconnect($conf['host'],$conf['port']);
}
if(!self::$client) return false ;
ignore_user_abort(true);//当客户断开连接,允许继续行
set_time_limit(0);//取消画面行延時間上限
$this->access = false;
$this->gt; sleepTime = 1000;
$expire = empty($expire) ? 3600 : intval($expire)+1;
$this->expire = $expire;
$this->queueName = $queueName;
$this->retryNum = 1000;
$this->head_key = $this->queueName 。 self::HEAD_KEY;
$this->tail_key = $this->queueName 。 self::TAIL_KEY;
$this->lock_key = $this->queueName 。 self::LOCK_KEY;
$this->_initSetHeadNTail();
}
/**
* キューの最初と最後の値を初期化して設定します
*/
private function _initSetHeadNTail(){
//当前队列首の数值
$this->currentHead = memcache_get(self::$client, $this->head_key);
if($this->currentHead === false) $this->currentHead =0;
//当前队列尾の数值
$this->currentTail = memcache_get(self::$client, $this->tail_key);
if($this->currentTail === false) $this->currentTail =0;
}
/**
* 要素を取り出す際はキュー先頭の値を変更します
* @param int $step ステップサイズの値
*/
private function _changeHead($step=1){
$this->currentHead += $step;
memcache_set(self::$client, $this->head_key,$this- >currentHead,false,$this->expire);
}
/**
* 要素を追加する場合は、キューの最後尾の値を変更します
* @param int $step ステップ値
* @param bool $reverse 反転するかどうか
* @return null
*/
private function _changeTail($step=1, $reverse =false){
if(!$reverse){
$this->currentTail += $step;
}else{
$this->currentTail -= $step;
}
memcache_set(self::$client, $this->tail_key,$this- >currentTail,false,$this->expire);
}
/**
* キューが空かどうか
* @return bool
*/
private function _isEmpty(){
return (bool)($this->currentHead === $this ->currentTail);
}
/**
* キューはいっぱいですか
* @return bool
*/
プライベート関数 _isFull(){
$len = $this->currentTail - $this->currentHead;
return (bool)($ len === self::MAXNUM);
}
/**
* キューロック
*/
プライベート関数 _getLock(){
if($this->access === false){
while(!memcache_add( self::$client, $this->lock_key, 1, false, $this->expire) ){
usleep($this->sleepTime);
@$i++;
if($i > $this->retryNum){///尝试等待N次
return false;
break;
}
}
$this->_initSetHeadNTail();
return $this->access = true;
}
return $this->access;
}
/**
* キューのロックが解除されました
*/
プライベート関数 _unLock (){
memcache_delete(self::$client, $this->lock_key, 0);
$this->access = false;
}
/**
* 現在のキューの長さを取得します
* この長さは理論上の長さであり、一部の要素は期限切れにより失われます。実際の長さ * @return int
*/
public function getQueueLength() {
$this->_initSetHeadNTail();
return intval($this->currentTail - $this->currentHead);
}
/**
* キューデータを追加します
* @param void $data 追加するデータ
* @return bool
*/
public function add($data) {
if(!$this->_getLock()) return false;
if($this->_isFull()){
$this->_unLock();
return false;
}
$ value_key = $this->queueName 。 self::VALU_KEY 。 strval($this->currentTail +1);
$result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);
if($result){
$this ->_changeTail();
}
$this->_unLock();
return $result;
}
/**
* キューデータの読み取り
* @param int $length 読み取る長さ(逆読み取りの場合は負の数を使用)
* @return array
*/
public function read($length=0){
if (!is_numeric($length)) return false;
$this->_initSetHeadNTail();
if($this->_isEmpty()){
return false;
}
if(empty($length) ) $length = self::MAXNUM;//默认全部
$keyArr = array();
if($length >0){//正向读取(从队列首向队列尾)
$tmpMin = $ this->currentHead;
$tmpMax = $tmpMin + $length;
for($i= $tmpMin; $i<=$tmpMax; $i++){
$keyArr[] = $this->queueName 。 self::VALU_KEY 。 $i;
}
}else{//逆方向读取(从队列尾向队列首)
$tmpMax = $this->currentTail;
$tmpMin = $tmpMax + $length;
for($i= $tmpMax; $i >$tmpMin; $i--){
$keyArr[] = $this->queueName 。 self::VALU_KEY 。 $i;
}
}
$result = @memcache_get(self::$client, $keyArr);
return $result;
}
/**
*/
public function get($length =0){
if(!is_numeric($length)) return false;
if(!$this->_getLock()) return false;
if($this->_isEmpty()){
$this ->_unLock();
return false;
}
if(empty($length)) $length = self::MAXNUM;//默认全部
$length = intval($length);
$keyArr = array ();
if($length >0){//正向读取(从队列首向队列尾)
$tmpMin = $this->currentHead;
$tmpMax = $tmpMin + $length;
for ($i= $tmpMin; $i<=$tmpMax; $i++){
$keyArr[] = $this->queueName 。 self::VALU_KEY 。 $i;
}
$this->_changeHead($length);
}else{//逆方向读取(从队列尾向队列首)
$tmpMax = $this->currentTail;
$tmpMin = $tmpMax + $length;
for($i= $tmpMax; $i >$tmpMin; $i--){
$keyArr[] = $this->queueName 。 self::VALU_KEY 。 $i;
}
$this->_changeTail(abs($length), true);
}
$result = @memcache_get(self::$client, $keyArr);
foreach($keyArr as $v){//取出之後删除
@memcache_delete(self::$client, $v, 0);
}
$this->_unLock();
return $result;
}
/**
* キューをクリアします
*/
public function clear(){
if(!$this->_getLock()) return false;
if($this->>_isEmpty()){
$this->>_unLock();
return false;
}
$tmpMin = $this->currentHead--;
$tmpMax = $this->currentTail++;
for($i= $tmpMin; $i<=$tmpMax; $i++){
$tmpKey = $this->queueName 。 self::VALU_KEY 。 $i;
@memcache_delete(self::$client, $tmpKey, 0);
}
$this->currentTail = $this->currentHead = 0;
memcache_set(self::$client, $this ->head_key,$this->currentHead,false,$this->expire);
memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this- >expire);
$this->_unLock();
}
/*
* 清除すべてmemcache缓存数据
*/
public function memFlush(){
memcache_flush(self::$client);
}
}//授業終了