首页  >  文章  >  后端开发  >  php实现的memcached队列类

php实现的memcached队列类

WBOY
WBOY原创
2016-07-25 08:43:27881浏览
  1. /*
  2. * memcache队列类
  3. * 支持多进程并发写入、读取
  4. * 边写边读,AB面轮值替换
  5. * @author guoyu
  6. * @create on 9:25 2014-9-28
  7. * @qq技术行业交流群:136112330
  8. *
  9. * @example:
  10. * $obj = new memcacheQueue('duilie');
  11. * $obj->add('1asdf');
  12. * $obj->getQueueLength();
  13. * $obj->read(11);
  14. * $obj->get(8);
  15. */
  16. class memcacheQueue{
  17. public static $client; //memcache客户端连接
  18. public $access; //队列是否可更新
  19. private $currentSide; //当前轮值的队列面:A/B
  20. private $lastSide; //上一轮值的队列面:A/B
  21. private $sideAHead; //A面队首值
  22. private $sideATail; //A面队尾值
  23. private $sideBHead; //B面队首值
  24. private $sideBTail; //B面队尾值
  25. private $currentHead; //当前队首值
  26. private $currentTail; //当前队尾值
  27. private $lastHead; //上轮队首值
  28. private $lastTail; //上轮队尾值
  29. private $expire; //过期时间,秒,1~2592000,即30天内;0为永不过期
  30. private $sleepTime; //等待解锁时间,微秒
  31. private $queueName; //队列名称,唯一值
  32. private $retryNum; //重试次数,= 10 * 理论并发数
  33. const MAXNUM = 2000; //(单面)最大队列数,建议上限10K
  34. const HEAD_KEY = '_lkkQueueHead_'; //队列首kye
  35. const TAIL_KEY = '_lkkQueueTail_'; //队列尾key
  36. const VALU_KEY = '_lkkQueueValu_'; //队列值key
  37. const LOCK_KEY = '_lkkQueueLock_'; //队列锁key
  38. const SIDE_KEY = '_lkkQueueSide_'; //轮值面key
  39. /*
  40. * 构造函数
  41. * @param [config] array memcache服务器参数
  42. * @param [queueName] string 队列名称
  43. * @param [expire] string 过期时间
  44. * @return NULL
  45. */
  46. public function __construct($queueName ='',$expire='',$config =''){
  47. if(empty($config)){
  48. self::$client = memcache_pconnect('localhost',11211);
  49. }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
  50. self::$client = memcache_pconnect($config['host'],$config['port']);
  51. }elseif(is_string($config)){//"127.0.0.1:11211"
  52. $tmp = explode(':',$config);
  53. $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
  54. $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
  55. self::$client = memcache_pconnect($conf['host'],$conf['port']);
  56. }
  57. if(!self::$client) return false;
  58. ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
  59. set_time_limit(0);//取消脚本执行延时上限
  60. $this->access = false;
  61. $this->sleepTime = 1000;
  62. $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
  63. $this->expire = $expire;
  64. $this->queueName = $queueName;
  65. $this->retryNum = 10000;
  66. $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
  67. $this->getHeadNTail($queueName);
  68. if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
  69. if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
  70. if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  71. if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  72. }
  73. /*
  74. * 获取队列首尾值
  75. * @param [queueName] string 队列名称
  76. * @return NULL
  77. */
  78. private function getHeadNTail($queueName){
  79. $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
  80. $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
  81. $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
  82. $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
  83. }
  84. /*
  85. * 获取当前轮值的队列面
  86. * @return string 队列面名称
  87. */
  88. public function getCurrentSide(){
  89. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  90. if($currentSide == 'A'){
  91. $this->currentSide = 'A';
  92. $this->lastSide = 'B';
  93. $this->currentHead = $this->sideAHead;
  94. $this->currentTail = $this->sideATail;
  95. $this->lastHead = $this->sideBHead;
  96. $this->lastTail = $this->sideBTail;
  97. }else{
  98. $this->currentSide = 'B';
  99. $this->lastSide = 'A';
  100. $this->currentHead = $this->sideBHead;
  101. $this->currentTail = $this->sideBTail;
  102. $this->lastHead = $this->sideAHead;
  103. $this->lastTail = $this->sideATail;
  104. }
  105. return $this->currentSide;
  106. }
  107. /*
  108. * 队列加锁
  109. * @return boolean
  110. */
  111. private function getLock(){
  112. if($this->access === false){
  113. while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
  114. usleep($this->sleepTime);
  115. @$i++;
  116. if($i > $this->retryNum){//尝试等待N次
  117. return false;
  118. break;
  119. }
  120. }
  121. return $this->access = true;
  122. }
  123. return false;
  124. }
  125. /*
  126. * 队列解锁
  127. * @return NULL
  128. */
  129. private function unLock(){
  130. memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
  131. $this->access = false;
  132. }
  133. /*
  134. * 添加数据
  135. * @param [data] 要存储的值
  136. * @return boolean
  137. */
  138. public function add($data){
  139. $result = false;
  140. if(!$this->getLock()){
  141. return $result;
  142. }
  143. $this->getHeadNTail($this->queueName);
  144. $this->getCurrentSide();
  145. if($this->isFull()){
  146. $this->unLock();
  147. return false;
  148. }
  149. if($this->currentTail $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
  150. if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
  151. $this->changeTail();
  152. $result = true;
  153. }
  154. }else{//当前队列已满,更换轮值面
  155. $this->unLock();
  156. $this->changeCurrentSide();
  157. return $this->add($data);
  158. }
  159. $this->unLock();
  160. return $result;
  161. }
  162. /*
  163. * 取出数据
  164. * @param [length] int 数据的长度
  165. * @return array
  166. */
  167. public function get($length=0){
  168. if(!is_numeric($length)) return false;
  169. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  170. if(!$this->getLock()) return false;
  171. if($this->isEmpty()){
  172. $this->unLock();
  173. return false;
  174. }
  175. $keyArray = $this->getKeyArray($length);
  176. $lastKey = $keyArray['lastKey'];
  177. $currentKey = $keyArray['currentKey'];
  178. $keys = $keyArray['keys'];
  179. $this->changeHead($this->lastSide,$lastKey);
  180. $this->changeHead($this->currentSide,$currentKey);
  181. $data = @memcache_get(self::$client, $keys);
  182. foreach($keys as $v){//取出之后删除
  183. @memcache_delete(self::$client, $v, 0);
  184. }
  185. $this->unLock();
  186. return $data;
  187. }
  188. /*
  189. * 读取数据
  190. * @param [length] int 数据的长度
  191. * @return array
  192. */
  193. public function read($length=0){
  194. if(!is_numeric($length)) return false;
  195. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  196. $keyArray = $this->getKeyArray($length);
  197. $data = @memcache_get(self::$client, $keyArray['keys']);
  198. return $data;
  199. }
  200. /*
  201. * 获取队列某段长度的key数组
  202. * @param [length] int 队列长度
  203. * @return array
  204. */
  205. private function getKeyArray($length){
  206. $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
  207. $this->getHeadNTail($this->queueName);
  208. $this->getCurrentSide();
  209. if(empty($length)) return $result;
  210. //先取上一面的key
  211. $i = $result['lastKey'] = 0;
  212. for($i=0;$i $result['lastKey'] = $this->lastHead + $i;
  213. if($result['lastKey'] >= $this->lastTail) break;
  214. $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
  215. }
  216. //再取当前面的key
  217. $j = $length - $i;
  218. $k = $result['currentKey'] = 0;
  219. for($k=0;$k $result['currentKey'] = $this->currentHead + $k;
  220. if($result['currentKey'] >= $this->currentTail) break;
  221. $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
  222. }
  223. return $result;
  224. }
  225. /*
  226. * 更新当前轮值面队列尾的值
  227. * @return NULL
  228. */
  229. private function changeTail(){
  230. $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
  231. memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
  232. //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
  233. $v = memcache_get(self::$client, $tail_key) +1;
  234. memcache_set(self::$client, $tail_key,$v,false,$this->expire);
  235. }
  236. /*
  237. * 更新队列首的值
  238. * @param [side] string 要更新的面
  239. * @param [headValue] int 队列首的值
  240. * @return NULL
  241. */
  242. private function changeHead($side,$headValue){
  243. if($headValue $head_key = $this->queueName .$side . self::HEAD_KEY;
  244. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  245. $sideTail = memcache_get(self::$client, $tail_key);
  246. if($headValue memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
  247. }elseif($headValue >= $sideTail){
  248. $this->resetSide($side);
  249. }
  250. }
  251. /*
  252. * 重置队列面,即将该队列面的队首、队尾值置为0
  253. * @param [side] string 要重置的面
  254. * @return NULL
  255. */
  256. private function resetSide($side){
  257. $head_key = $this->queueName .$side . self::HEAD_KEY;
  258. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  259. memcache_set(self::$client, $head_key,0,false,$this->expire);
  260. memcache_set(self::$client, $tail_key,0,false,$this->expire);
  261. }
  262. /*
  263. * 改变当前轮值队列面
  264. * @return string
  265. */
  266. private function changeCurrentSide(){
  267. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  268. if($currentSide == 'A'){
  269. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
  270. $this->currentSide = 'B';
  271. }else{
  272. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
  273. $this->currentSide = 'A';
  274. }
  275. return $this->currentSide;
  276. }
  277. /*
  278. * 检查当前队列是否已满
  279. * @return boolean
  280. */
  281. public function isFull(){
  282. $result = false;
  283. if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
  284. $result = true;
  285. }
  286. return $result;
  287. }
  288. /*
  289. * 检查当前队列是否为空
  290. * @return boolean
  291. */
  292. public function isEmpty(){
  293. $result = true;
  294. if($this->sideATail > 0 || $this->sideBTail > 0){
  295. $result = false;
  296. }
  297. return $result;
  298. }
  299. /*
  300. * 获取当前队列的长度
  301. * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
  302. * @return int
  303. */
  304. public function getQueueLength(){
  305. $this->getHeadNTail($this->queueName);
  306. $this->getCurrentSide();
  307. $sideALength = $this->sideATail - $this->sideAHead;
  308. $sideBLength = $this->sideBTail - $this->sideBHead;
  309. $result = $sideALength + $sideBLength;
  310. return $result;
  311. }
  312. /*
  313. * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
  314. * @return boolean
  315. */
  316. public function clear(){
  317. if(!$this->getLock()) return false;
  318. for($i=0;$i<:maxnum> @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);
  319. @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
  320. }
  321. $this->unLock();
  322. $this->resetSide('A');
  323. $this->resetSide('B');
  324. return true;
  325. }
  326. /*
  327. * 清除所有memcache缓存数据
  328. * @return NULL
  329. */
  330. public function memFlush(){
  331. memcache_flush(self::$client);
  332. }
  333. }
复制代码

php, memcached


声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn