首页 >php教程 >PHP源码 >memcache队列类

memcache队列类

PHP中文网
PHP中文网原创
2016-05-25 17:05:511011浏览


add('1asdf');
 *		$obj->getQueueLength();
 *		$obj->read(11);
 *		$obj->get(8);
 */

class memcacheQueue{
	public static	$client;			//memcache客户端连接
	public			$access;			//队列是否可更新	
	private 		$currentSide;		//当前轮值的队列面:A/B
	private			$lastSide;			//上一轮值的队列面:A/B
	private 		$sideAHead;			//A面队首值
	private 		$sideATail;			//A面队尾值
	private 		$sideBHead;			//B面队首值
	private 		$sideBTail;			//B面队尾值
	private			$currentHead;		//当前队首值
	private			$currentTail;		//当前队尾值
	private			$lastHead;			//上轮队首值
	private			$lastTail;			//上轮队尾值	
	private 		$expire;			//过期时间,秒,1~2592000,即30天内
	private			$sleepTime;			//等待解锁时间,微秒
	private			$queueName;			//队列名称,唯一值
	private			$retryNum;			//重试次数,= 10 * 理论并发数
	
	const	MAXNUM		= 10000;				//(单面)最大队列数,建议上限10K
	const	HEAD_KEY	= '_lkkQueueHead_';		//队列首kye
	const	TAIL_KEY	= '_lkkQueueTail_';		//队列尾key
	const	VALU_KEY	= '_lkkQueueValu_';		//队列值key
	const	LOCK_KEY	= '_lkkQueueLock_';		//队列锁key
	const	SIDE_KEY	= '_lkkQueueSide_';		//轮值面key
	
	/*
	 * 构造函数
	 * @param	[queueName]	string	队列名称
	 * @param	[expire]	string	过期时间
	 * @param	[config]	array	memcache服务器参数	 
	 * @return	NULL
	 */
	public function __construct($queueName ='',$expire='',$config =''){
		if(empty($config)){
			self::$client = memcache_pconnect('127.0.0.1',11211);
		}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
			self::$client = memcache_pconnect($config['host'],$config['port']);
		}elseif(is_string($config)){//"127.0.0.1:11211"
			$tmp = explode(':',$config);
			$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
			$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
			self::$client = memcache_pconnect($conf['host'],$conf['port']);
		}
		if(!self::$client) return false;
		
		ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
		set_time_limit(0);//取消脚本执行延时上限
		
		$this->access = false;
		$this->sleepTime = 1000;
		$expire = (empty($expire)) ? 3600 : (int)$expire+1;
		$this->expire = $expire;
		$this->queueName = $queueName;
		$this->retryNum = 20000;
		
		$side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
		$this->getHeadNTail($queueName);
		if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
		if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
		if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
		if(!isset($this->sideBTail) || empty($this->sideBTail)) $this->sideBTail = 0;
	}
	
	/*
	 * 获取队列首尾值
	 * @param	[queueName]	string	队列名称
	 * @return	NULL
	 */
	private function getHeadNTail($queueName){
		$this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
		$this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
		$this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
		$this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
	}
	
	/*
	 * 获取当前轮值的队列面
	 * @return	string	队列面名称
	 */
	public function getCurrentSide(){
		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
		if($currentSide == 'A'){
			$this->currentSide = 'A';
			$this->lastSide = 'B';	

			$this->currentHead	= $this->sideAHead;
			$this->currentTail	= $this->sideATail;
			$this->lastHead		= $this->sideBHead;
			$this->lastTail		= $this->sideBTail;			
		}else{
			$this->currentSide = 'B';
			$this->lastSide = 'A';

			$this->currentHead	= $this->sideBHead;
			$this->currentTail	= $this->sideBTail;
			$this->lastHead		= $this->sideAHead;
			$this->lastTail		= $this->sideATail;						
		}
		
		return $this->currentSide;
	}
	
	/*
	 * 队列加锁
	 * @return boolean
	 */
	private function getLock(){
		if($this->access === false){
			while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
				usleep($this->sleepTime);
				@$i++;
				if($i > $this->retryNum){//尝试等待N次
					return false;
					break;
				}
			}
			return $this->access = true;
		}
		return false;
	}
	
	/*
	 * 队列解锁
	 * @return NULL
	 */
	private function unLock(){
		memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
		$this->access = false;
	}
	
	/*
	 * 添加数据
	 * @param	[data]	要存储的值
	 * @return	boolean
	 */
	public function add($data=''){
		$result = false;
		if(empty($data)) return $result;
		if(!$this->getLock()){
			return $result;
		} 
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();
		
		if($this->isFull()){
			$this->unLock();
			return false;
		}
		
		if($this->currentTail < self::MAXNUM){
			$value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
			if(memcache_set(self::$client, $value_key, $data, false, $this->expire)){
				$this->changeTail();
				$result = true;
			}
		}else{//当前队列已满,更换轮值面
			$this->unLock();
			$this->changeCurrentSide();
			return $this->add($data);
		}

		$this->unLock();
		return $result;
	}
	
	/*
	 * 取出数据
	 * @param	[length]	int	数据的长度
	 * @return	array
	 */
	public function get($length=0){
		if(!is_numeric($length)) return false;
		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
		if(!$this->getLock()) return false;

		if($this->isEmpty()){
			$this->unLock();
			return false;
		}
		
		$keyArray	= $this->getKeyArray($length);
		$lastKey	= $keyArray[&#39;lastKey&#39;];
		$currentKey	= $keyArray[&#39;currentKey&#39;];
		$keys		= $keyArray[&#39;keys&#39;];
		$this->changeHead($this->lastSide,$lastKey);
		$this->changeHead($this->currentSide,$currentKey);
		
		$data	= @memcache_get(self::$client, $keys);
		if(empty($data)) $data = array();
		foreach($keys as $v){//取出之后删除
			@memcache_delete(self::$client, $v, 0);
		}
		$this->unLock();

		return $data;
	}
	
	/*
	 * 读取数据
	 * @param	[length]	int	数据的长度
	 * @return	array
	 */
	public function read($length=0){
		if(!is_numeric($length)) return false;
		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
		$keyArray	= $this->getKeyArray($length);
		$data	= @memcache_get(self::$client, $keyArray[&#39;keys&#39;]);
		if(empty($data)) $data = array();
		return $data;
	}
	
	/*
	 * 获取队列某段长度的key数组
	 * @param	[length]	int	队列长度
	 * @return	array
	 */
	private function getKeyArray($length){
		$result = array(&#39;keys&#39;=>array(),&#39;lastKey&#39;=>null,&#39;currentKey&#39;=>null);
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();
		if(empty($length)) return $result;
		
		//先取上一面的key
		$i = $result[&#39;lastKey&#39;] = 0;
		for($i=0;$ilastHead + $i;
			if($result[&#39;lastKey&#39;] >= $this->lastTail) break;
			$result[&#39;keys&#39;][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result[&#39;lastKey&#39;];
		}
		
		//再取当前面的key
		$j = $length - $i;
		$k = $result[&#39;currentKey&#39;] = 0;
		for($k=0;$kcurrentHead + $k;
			if($result[&#39;currentKey&#39;] >= $this->currentTail) break;
			$result[&#39;keys&#39;][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result[&#39;currentKey&#39;];
		}

		return $result;
	}
	
	/*
	 * 更新当前轮值面队列尾的值
	 * @return	NULL
	 */
	private function changeTail(){
		$tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
		memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
		$v = memcache_get(self::$client, $tail_key) +1;
		memcache_set(self::$client, $tail_key,$v,false,$this->expire);
	}
	
	/*
	 * 更新队列首的值
	 * @param	[side]		string	要更新的面
	 * @param	[headValue]	int		队列首的值
	 * @return	NULL
	 */
	private function changeHead($side,$headValue){
		$head_key = $this->queueName .$side . self::HEAD_KEY;
		$tail_key = $this->queueName .$side . self::TAIL_KEY;
		$sideTail = memcache_get(self::$client, $tail_key);
		if($headValue < $sideTail){
			memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
		}elseif($headValue >= $sideTail){
			$this->resetSide($side);
		}
	}
	
	/*
	 * 重置队列面,即将该队列面的队首、队尾值置为0
	 * @param	[side]	string	要重置的面
	 * @return	NULL
	 */
	private function resetSide($side){
		$head_key = $this->queueName .$side . self::HEAD_KEY;
		$tail_key = $this->queueName .$side . self::TAIL_KEY;
		memcache_set(self::$client, $head_key,0,false,$this->expire);
		memcache_set(self::$client, $tail_key,0,false,$this->expire);
	}
	
	
	/*
	 * 改变当前轮值队列面
	 * @return	string
	 */
	private function changeCurrentSide(){
		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
		if($currentSide == &#39;A&#39;){
			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;B&#39;,false,$this->expire);
			$this->currentSide = &#39;B&#39;;
		}else{
			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;A&#39;,false,$this->expire);
			$this->currentSide = &#39;A&#39;;
		}
		return $this->currentSide;
	}
	
	/*
	 * 检查当前队列是否已满
	 * @return	boolean
	 */
	public function isFull(){
		$result = false;
		if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
			$result = true;
		}
		return $result;
	}
	
	/*
	 * 检查当前队列是否为空
	 * @return	boolean
	 */
	public function isEmpty(){
		$result = true;
		if($this->sideATail > 0 || $this->sideBTail > 0){
			$result = false;
		}
		return $result;
	}
	
	/*
	 * 获取当前队列的长度
	 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
	 * @return	int
	 */
	public function getQueueLength(){
		$this->getHeadNTail($this->queueName);

		$sideALength = $this->sideATail - $this->sideAHead;
		$sideBLength = $this->sideBTail - $this->sideBHead;
		$result = $sideALength + $sideBLength;
		
		return $result;
	}
	

	/*
	 * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
	 * @return	boolean
	 */
	public function clear(){
		if(!$this->getLock()) return false;
		$this->getHeadNTail($this->queueName);
		$AHead = $this->sideAHead;$AHead--;
		$ATail = $this->sideATail;$ATail++;
		$BHead = $this->sideBHead;$BHead--;		
		$BTail = $this->sideBTail;$BTail++;
		
		//删除A面
		for($i=$AHead;$iqueueName.&#39;A&#39;. self::VALU_KEY .$i, 0);
		}
		
		//删除B面
		for($j=$BHead;$jqueueName.&#39;A&#39;. self::VALU_KEY .$j, 0);
		}

		$this->unLock();
		$this->resetSide(&#39;A&#39;);
		$this->resetSide(&#39;B&#39;);
		return true;
	}
	
	/*
	 * 清除所有memcache缓存数据
	 * @return	NULL
	 */
	public function memFlush(){
		memcache_flush(self::$client);
	}



}

                                       

                   

2. [文件] test.php ~ 3KB       

               

<?php
//memcacheQueue队列类测试
header("Content-type: text/html; charset=utf-8;private, must-revalidate");
require_once(&#39;memcacheQueue.class.php&#39;);
//获取时间,精度微秒
function microtimeFloat(){
    list($usec, $sec) = explode(" ", microtime());
    return ((float)$usec + (float)$sec);
}


$queObj = new memcacheQueue(&#39;duilie&#39;);
$client = memcache_pconnect(&#39;127.0.0.1&#39;,11211);
$length = 20000;


/*------------------------------------------
 * 队列类测试
 *------------------------------------------
 */

//写数据
$time_add_start = microtimeFloat();
for($i=1;$i

                           

       

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn