Rumah >php教程 >PHP源码 >php的memcache队列类

php的memcache队列类

PHP中文网
PHP中文网asal
2016-05-25 17:09:32995semak imbas

memcacheQueue.class.php 

<?php
/*
 * memcache队列类
 * 支持多进程并发写入、读取
 * 边写边读,AB面轮值替换
 * @author lkk/lianq.net
 * @create on 9:25 2012-9-28
 *
 *
 * @example:
 *		$obj = new memcacheQueue(&#39;duilie&#39;);
 *		$obj->add(&#39;1asdf&#39;);
 *		$obj->getQueueLength();
 *		$obj->read(11);
 *		$obj->get(8);
 */

class memcacheQueue{
	public static	$client;			//memcache客户端连接
	public			$access;			//队列是否可更新	
	private 		$currentSide;		//当前轮值的队列面:A/B
	private			$lastSide;			//上一轮值的队列面:A/B
	private 		$sideAHead;			//A面队首值
	private 		$sideATail;			//A面队尾值
	private 		$sideBHead;			//B面队首值
	private 		$sideBTail;			//B面队尾值
	private			$currentHead;		//当前队首值
	private			$currentTail;		//当前队尾值
	private			$lastHead;			//上轮队首值
	private			$lastTail;			//上轮队尾值	
	private 		$expire;			//过期时间,秒,1~2592000,即30天内;0为永不过期
	private			$sleepTime;			//等待解锁时间,微秒
	private			$queueName;			//队列名称,唯一值
	private			$retryNum;			//重试次数,= 10 * 理论并发数
	
	const	MAXNUM		= 2000;					//(单面)最大队列数,建议上限10K
	const	HEAD_KEY	= &#39;_lkkQueueHead_&#39;;		//队列首kye
	const	TAIL_KEY	= &#39;_lkkQueueTail_&#39;;		//队列尾key
	const	VALU_KEY	= &#39;_lkkQueueValu_&#39;;		//队列值key
	const	LOCK_KEY	= &#39;_lkkQueueLock_&#39;;		//队列锁key
	const	SIDE_KEY	= &#39;_lkkQueueSide_&#39;;		//轮值面key
	
	/*
	 * 构造函数
	 * @param	[config]	array	memcache服务器参数
	 * @param	[queueName]	string	队列名称
	 * @param	[expire]	string	过期时间
	 * @return	NULL
	 */
	public function __construct($queueName =&#39;&#39;,$expire=&#39;&#39;,$config =&#39;&#39;){
		if(empty($config)){
			self::$client = memcache_pconnect(&#39;localhost&#39;,11211);
		}elseif(is_array($config)){//array(&#39;host&#39;=>&#39;127.0.0.1&#39;,&#39;port&#39;=>&#39;11211&#39;)
			self::$client = memcache_pconnect($config[&#39;host&#39;],$config[&#39;port&#39;]);
		}elseif(is_string($config)){//"127.0.0.1:11211"
			$tmp = explode(&#39;:&#39;,$config);
			$conf[&#39;host&#39;] = isset($tmp[0]) ? $tmp[0] : &#39;127.0.0.1&#39;;
			$conf[&#39;port&#39;] = isset($tmp[1]) ? $tmp[1] : &#39;11211&#39;;
			self::$client = memcache_pconnect($conf[&#39;host&#39;],$conf[&#39;port&#39;]);		
		}
		if(!self::$client) return false;
		
		ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
		set_time_limit(0);//取消脚本执行延时上限
		
		$this->access = false;
		$this->sleepTime = 1000;
		$expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
		$this->expire = $expire;
		$this->queueName = $queueName;
		$this->retryNum = 10000;
		
		$side = memcache_add(self::$client, $queueName . self::SIDE_KEY, &#39;A&#39;,false, $expire);
		$this->getHeadNTail($queueName);
		if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
		if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
		if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
		if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
	}
	
	/*
	 * 获取队列首尾值
	 * @param	[queueName]	string	队列名称
	 * @return	NULL
	 */
	private function getHeadNTail($queueName){
		$this->sideAHead = (int)memcache_get(self::$client, $queueName.&#39;A&#39;. self::HEAD_KEY);
		$this->sideATail = (int)memcache_get(self::$client, $queueName.&#39;A&#39;. self::TAIL_KEY);
		$this->sideBHead = (int)memcache_get(self::$client, $queueName.&#39;B&#39;. self::HEAD_KEY);
		$this->sideBTail = (int)memcache_get(self::$client, $queueName.&#39;B&#39;. self::TAIL_KEY);
	}
	
	/*
	 * 获取当前轮值的队列面
	 * @return	string	队列面名称
	 */
	public function getCurrentSide(){
		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
		if($currentSide == &#39;A&#39;){
			$this->currentSide = &#39;A&#39;;
			$this->lastSide = &#39;B&#39;;	

			$this->currentHead	= $this->sideAHead;
			$this->currentTail	= $this->sideATail;
			$this->lastHead		= $this->sideBHead;
			$this->lastTail		= $this->sideBTail;			
		}else{
			$this->currentSide = &#39;B&#39;;
			$this->lastSide = &#39;A&#39;;

			$this->currentHead	= $this->sideBHead;
			$this->currentTail	= $this->sideBTail;
			$this->lastHead		= $this->sideAHead;
			$this->lastTail		= $this->sideATail;						
		}
		
		return $this->currentSide;
	}
	
	/*
	 * 队列加锁
	 * @return boolean
	 */
	private function getLock(){
		if($this->access === false){
			while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
				usleep($this->sleepTime);
				@$i++;
				if($i > $this->retryNum){//尝试等待N次
					return false;
					break;
				}
			}
			return $this->access = true;
		}
		return false;
	}
	
	/*
	 * 队列解锁
	 * @return NULL
	 */
	private function unLock(){
		memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
		$this->access = false;
	}
	
	/*
	 * 添加数据
	 * @param	[data]	要存储的值
	 * @return	boolean
	 */
	public function add($data){
		$result = false;
		if(!$this->getLock()){
			return $result;
		} 
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();
		
		if($this->isFull()){
			$this->unLock();
			return false;
		}
		
		if($this->currentTail < self::MAXNUM){
			$value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
			if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
				$this->changeTail();
				$result = true;
			}
		}else{//当前队列已满,更换轮值面
			$this->unLock();
			$this->changeCurrentSide();
			return $this->add($data);
		}

		$this->unLock();
		return $result;
	}
	
	/*
	 * 取出数据
	 * @param	[length]	int	数据的长度
	 * @return	array
	 */
	public function get($length=0){
		if(!is_numeric($length)) return false;
		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
		if(!$this->getLock()) return false;

		if($this->isEmpty()){
			$this->unLock();
			return false;
		}
		
		$keyArray	= $this->getKeyArray($length);
		$lastKey	= $keyArray[&#39;lastKey&#39;];
		$currentKey	= $keyArray[&#39;currentKey&#39;];
		$keys		= $keyArray[&#39;keys&#39;];
		$this->changeHead($this->lastSide,$lastKey);
		$this->changeHead($this->currentSide,$currentKey);
		
		$data	= @memcache_get(self::$client, $keys);
		foreach($keys as $v){//取出之后删除
			@memcache_delete(self::$client, $v, 0);
		}
		$this->unLock();

		return $data;
	}
	
	/*
	 * 读取数据
	 * @param	[length]	int	数据的长度
	 * @return	array
	 */
	public function read($length=0){
		if(!is_numeric($length)) return false;
		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
		$keyArray	= $this->getKeyArray($length);
		$data	= @memcache_get(self::$client, $keyArray[&#39;keys&#39;]);
		return $data;
	}
	
	/*
	 * 获取队列某段长度的key数组
	 * @param	[length]	int	队列长度
	 * @return	array
	 */
	private function getKeyArray($length){
		$result = array(&#39;keys&#39;=>array(),&#39;lastKey&#39;=>array(),&#39;currentKey&#39;=>array());
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();
		if(empty($length)) return $result;
		
		//先取上一面的key
		$i = $result[&#39;lastKey&#39;] = 0;
		for($i=0;$i<$length;$i++){
			$result[&#39;lastKey&#39;] = $this->lastHead + $i;
			if($result[&#39;lastKey&#39;] >= $this->lastTail) break;
			$result[&#39;keys&#39;][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result[&#39;lastKey&#39;];
		}
		
		//再取当前面的key
		$j = $length - $i;
		$k = $result[&#39;currentKey&#39;] = 0;
		for($k=0;$k<$j;$k++){
			$result[&#39;currentKey&#39;] = $this->currentHead + $k;
			if($result[&#39;currentKey&#39;] >= $this->currentTail) break;
			$result[&#39;keys&#39;][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result[&#39;currentKey&#39;];
		}

		return $result;
	}
	
	/*
	 * 更新当前轮值面队列尾的值
	 * @return	NULL
	 */
	private function changeTail(){
		$tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
		memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
		//memcache_increment(self::$client, $tail_key, 1);//队列尾+1
		$v = memcache_get(self::$client, $tail_key) +1;
		memcache_set(self::$client, $tail_key,$v,false,$this->expire);
	}
	
	/*
	 * 更新队列首的值
	 * @param	[side]		string	要更新的面
	 * @param	[headValue]	int		队列首的值
	 * @return	NULL
	 */
	private function changeHead($side,$headValue){
		if($headValue < 1) return false;
		$head_key = $this->queueName .$side . self::HEAD_KEY;
		$tail_key = $this->queueName .$side . self::TAIL_KEY;
		$sideTail = memcache_get(self::$client, $tail_key);
		if($headValue < $sideTail){
			memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
		}elseif($headValue >= $sideTail){
			$this->resetSide($side);
		}
	}
	
	/*
	 * 重置队列面,即将该队列面的队首、队尾值置为0
	 * @param	[side]	string	要重置的面
	 * @return	NULL
	 */
	private function resetSide($side){
		$head_key = $this->queueName .$side . self::HEAD_KEY;
		$tail_key = $this->queueName .$side . self::TAIL_KEY;
		memcache_set(self::$client, $head_key,0,false,$this->expire);
		memcache_set(self::$client, $tail_key,0,false,$this->expire);
	}
	
	
	/*
	 * 改变当前轮值队列面
	 * @return	string
	 */
	private function changeCurrentSide(){
		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
		if($currentSide == &#39;A&#39;){
			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;B&#39;,false,$this->expire);
			$this->currentSide = &#39;B&#39;;
		}else{
			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;A&#39;,false,$this->expire);
			$this->currentSide = &#39;A&#39;;
		}
		return $this->currentSide;
	}
	
	/*
	 * 检查当前队列是否已满
	 * @return	boolean
	 */
	public function isFull(){
		$result = false;
		if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
			$result = true;
		}
		return $result;
	}
	
	/*
	 * 检查当前队列是否为空
	 * @return	boolean
	 */
	public function isEmpty(){
		$result = true;
		if($this->sideATail > 0 || $this->sideBTail > 0){
			$result = false;
		}
		return $result;
	}
	
	/*
	 * 获取当前队列的长度
	 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
	 * @return	int
	 */
	public function getQueueLength(){
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();

		$sideALength = $this->sideATail - $this->sideAHead;
		$sideBLength = $this->sideBTail - $this->sideBHead;
		$result = $sideALength + $sideBLength;
		
		return $result;
	}
	

	/*
	 * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
	 * @return	boolean
	 */
	public function clear(){
		if(!$this->getLock()) return false;
		for($i=0;$i<self::MAXNUM;$i++){
			@memcache_delete(self::$client, $this->queueName.&#39;A&#39;. self::VALU_KEY .$i, 0);
			@memcache_delete(self::$client, $this->queueName.&#39;B&#39;. self::VALU_KEY .$i, 0);
		}
		$this->unLock();
		$this->resetSide(&#39;A&#39;);
		$this->resetSide(&#39;B&#39;);
		return true;
	}
	
	/*
	 * 清除所有memcache缓存数据
	 * @return	NULL
	 */
	public function memFlush(){
		memcache_flush(self::$client);
	}


}

                               

                   

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn