Home >Backend Development >PHP Tutorial >memcached queue class implemented by php

memcached queue class implemented by php

WBOY
WBOYOriginal
2016-07-25 08:43:27922browse
  1. /*
  2. * memcache queue class
  3. * Supports concurrent writing and reading of multiple processes
  4. * Read while writing, AB side rotation replacement
  5. * @author guoyu
  6. * @create on 9: 25 2014-9-28
  7. * @qq technology industry exchange group: 136112330
  8. *
  9. * @example:
  10. * $obj = new memcacheQueue('duilie');
  11. * $obj->add('1asdf');
  12. * $obj->getQueueLength();
  13. * $obj->read(11);
  14. * $obj->get(8);
  15. */
  16. class memcacheQueue{
  17. public static $client; / /memcache client connection
  18. public $access; //Whether the queue can be updated
  19. private $currentSide; //The queue side of the current rotation: A/B
  20. private $lastSide; //The queue side of the previous rotation: A/B
  21. private $sideAHead; //The first value of side A
  22. private $sideATail; //The last value of side A
  23. private $sideBHead; //The first value of side B
  24. private $sideBTail; //The last value of side B
  25. private $ currentHead; //The current head value of the team
  26. private $currentTail; //The current tail value of the team
  27. private $lastHead; //The first value of the last round of team
  28. private $lastTail; //The last value of the last round of team
  29. private $expire; // Expiration time, seconds, 1~2592000, that is, within 30 days; 0 means never expires
  30. private $sleepTime; //Waiting time for unlocking, microseconds
  31. private $queueName; //Queue name, unique value
  32. private $retryNum; // Number of retries, = 10 * theoretical concurrency number
  33. const MAXNUM = 2000; //(single-sided) maximum number of queues, the recommended upper limit is 10K
  34. const HEAD_KEY = '_lkkQueueHead_'; //queue head
  35. const TAIL_KEY = '_lkkQueueTail_' ; // Queue tail key
  36. const VALU_KEY = '_lkkQueueValu_'; // Queue value key
  37. const LOCK_KEY = '_lkkQueueLock_'; // Queue lock key
  38. const SIDE_KEY = '_lkkQueueSide_'; // Rotation side key
  39. /*
  40. * Constructor
  41. * @param [config] array memcache server parameters
  42. * @param [queueName] string queue name
  43. * @param [expire] string expiration time
  44. * @return NULL
  45. */
  46. public function __construct($queueName = '',$expire='',$config =''){
  47. if(empty($config)){
  48. self::$client = memcache_pconnect('localhost',11211);
  49. }elseif(is_array($config )){//array('host'=>'127.0.0.1','port'=>'11211')
  50. self::$client = memcache_pconnect($config['host'],$config[' port']);
  51. }elseif(is_string($config)){//"127.0.0.1:11211"
  52. $tmp = explode(':',$config);
  53. $conf['host'] = isset( $tmp[0]) ? $tmp[0] : '127.0.0.1';
  54. $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
  55. self: :$client = memcache_pconnect($conf['host'],$conf['port']);
  56. }
  57. if(!self::$client) return false;
  58. ignore_user_abort(TRUE);//When the client disconnects Open the connection and allow continued execution
  59. set_time_limit(0);//Cancel the upper limit of script execution delay
  60. $this->access = false;
  61. $this->sleepTime = 1000;
  62. $expire = (empty($expire ) && $expire!=0) ? 3600 : (int)$expire;
  63. $this->expire = $expire;
  64. $this->queueName = $queueName;
  65. $this->retryNum = 10000;
  66. $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
  67. $this->getHeadNTail($queueName);
  68. if(!isset($this ->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
  69. if(!isset($this->sideATail) || empty($this->sideATail) ) $this->sideATail = 0;
  70. if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  71. if(!isset( $this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  72. }
  73. /*
  74. * 获取队列首尾值
  75. * @param [queueName] string 队列名称
  76. * @return NULL
  77. */
  78. private function getHeadNTail($queueName){
  79. $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
  80. $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
  81. $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
  82. $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
  83. }
  84. /*
  85. * 获取当前轮值的队列面
  86. * @return string 队列面名称
  87. */
  88. public function getCurrentSide(){
  89. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  90. if($currentSide == 'A'){
  91. $this->currentSide = 'A';
  92. $this->lastSide = 'B';
  93. $this->currentHead = $this->sideAHead;
  94. $this->currentTail = $this->sideATail;
  95. $this->lastHead = $this->sideBHead;
  96. $this->lastTail = $this->sideBTail;
  97. }else{
  98. $this->currentSide = 'B';
  99. $this->lastSide = 'A';
  100. $this->currentHead = $this->sideBHead;
  101. $this->currentTail = $this->sideBTail;
  102. $this->lastHead = $this->sideAHead;
  103. $this->lastTail = $this->sideATail;
  104. }
  105. return $this->currentSide;
  106. }
  107. /*
  108. * 队列加锁
  109. * @return boolean
  110. */
  111. private function getLock(){
  112. if($this->access === false){
  113. while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
  114. usleep($this->sleepTime);
  115. @$i++;
  116. if($i > $this->retryNum){//尝试等待N次
  117. return false;
  118. break;
  119. }
  120. }
  121. return $this->access = true;
  122. }
  123. return false;
  124. }
  125. /*
  126. * 队列解锁
  127. * @return NULL
  128. */
  129. private function unLock(){
  130. memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
  131. $this->access = false;
  132. }
  133. /*
  134. * 添加数据
  135. * @param [data] 要存储的值
  136. * @return boolean
  137. */
  138. public function add($data){
  139. $result = false;
  140. if(!$this->getLock()){
  141. return $result;
  142. }
  143. $this->getHeadNTail($this->queueName);
  144. $this->getCurrentSide();
  145. if($this->isFull()){
  146. $this->unLock();
  147. return false;
  148. }
  149. if($this->currentTail < self::MAXNUM){
  150. $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
  151. if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
  152. $this->changeTail();
  153. $result = true;
  154. }
  155. }else{//当前队列已满,更换轮值面
  156. $this->unLock();
  157. $this->changeCurrentSide();
  158. return $this->add($data);
  159. }
  160. $this->unLock();
  161. return $result;
  162. }
  163. /*
  164. * 取出数据
  165. * @param [length] int 数据的长度
  166. * @return array
  167. */
  168. public function get($length=0){
  169. if(!is_numeric($length)) return false;
  170. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  171. if(!$this->getLock()) return false;
  172. if($this->isEmpty()){
  173. $this->unLock();
  174. return false;
  175. }
  176. $keyArray = $this->getKeyArray($length);
  177. $lastKey = $keyArray['lastKey'];
  178. $currentKey = $keyArray['currentKey'];
  179. $keys = $keyArray['keys'];
  180. $this->changeHead($this->lastSide,$lastKey);
  181. $this->changeHead($this->currentSide,$currentKey);
  182. $data = @memcache_get(self::$client, $keys);
  183. foreach($keys as $v){//取出之后删除
  184. @memcache_delete(self::$client, $v, 0);
  185. }
  186. $this->unLock();
  187. return $data;
  188. }
  189. /*
  190. * 读取数据
  191. * @param [length] int 数据的长度
  192. * @return array
  193. */
  194. public function read($length=0){
  195. if(!is_numeric($length)) return false;
  196. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  197. $keyArray = $this->getKeyArray($length);
  198. $data = @memcache_get(self::$client, $keyArray['keys']);
  199. return $data;
  200. }
  201. /*
  202. * 获取队列某段长度的key数组
  203. * @param [length] int 队列长度
  204. * @return array
  205. */
  206. private function getKeyArray($length){
  207. $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
  208. $this->getHeadNTail($this->queueName);
  209. $this->getCurrentSide();
  210. if(empty($length)) return $result;
  211. //先取上一面的key
  212. $i = $result['lastKey'] = 0;
  213. for($i=0;$i<$length;$i++){
  214. $result['lastKey'] = $this->lastHead + $i;
  215. if($result['lastKey'] >= $this->lastTail) break;
  216. $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
  217. }
  218. //再取当前面的key
  219. $j = $length - $i;
  220. $k = $result['currentKey'] = 0;
  221. for($k=0;$k<$j;$k++){
  222. $result['currentKey'] = $this->currentHead + $k;
  223. if($result['currentKey'] >= $this->currentTail) break;
  224. $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
  225. }
  226. return $result;
  227. }
  228. /*
  229. * 更新当前轮值面队列尾的值
  230. * @return NULL
  231. */
  232. private function changeTail(){
  233. $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
  234. memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
  235. //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
  236. $v = memcache_get(self::$client, $tail_key) +1;
  237. memcache_set(self::$client, $tail_key,$v,false,$this->expire);
  238. }
  239. /*
  240. * 更新队列首的值
  241. * @param [side] string 要更新的面
  242. * @param [headValue] int 队列首的值
  243. * @return NULL
  244. */
  245. private function changeHead($side,$headValue){
  246. if($headValue < 1) return false;
  247. $head_key = $this->queueName .$side . self::HEAD_KEY;
  248. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  249. $sideTail = memcache_get(self::$client, $tail_key);
  250. if($headValue < $sideTail){
  251. memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
  252. }elseif($headValue >= $sideTail){
  253. $this->resetSide($side);
  254. }
  255. }
  256. /*
  257. * 重置队列面,即将该队列面的队首、队尾值置为0
  258. * @param [side] string 要重置的面
  259. * @return NULL
  260. */
  261. private function resetSide($side){
  262. $head_key = $this->queueName .$side . self::HEAD_KEY;
  263. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  264. memcache_set(self::$client, $head_key,0,false,$this->expire);
  265. memcache_set(self::$client, $tail_key,0,false,$this->expire);
  266. }
  267. /*
  268. * 改变当前轮值队列面
  269. * @return string
  270. */
  271. private function changeCurrentSide(){
  272. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  273. if($currentSide == 'A'){
  274. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
  275. $this->currentSide = 'B';
  276. }else{
  277. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
  278. $this->currentSide = 'A';
  279. }
  280. return $this->currentSide;
  281. }
  282. /*
  283. * 检查当前队列是否已满
  284. * @return boolean
  285. */
  286. public function isFull(){
  287. $result = false;
  288. if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
  289. $result = true;
  290. }
  291. return $result;
  292. }
  293. /*
  294. * 检查当前队列是否为空
  295. * @return boolean
  296. */
  297. public function isEmpty(){
  298. $result = true;
  299. if($this->sideATail > 0 || $this->sideBTail > 0){
  300. $result = false;
  301. }
  302. return $result;
  303. }
  304. /*
  305. * 获取当前队列的长度
  306. * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
  307. * @return int
  308. */
  309. public function getQueueLength(){
  310. $this->getHeadNTail($this->queueName);
  311. $this->getCurrentSide();
  312. $sideALength = $this->sideATail - $this->sideAHead;
  313. $sideBLength = $this->sideBTail - $this->sideBHead;
  314. $result = $sideALength + $sideBLength;
  315. return $result;
  316. }
  317. /*
  318. * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
  319. * @return boolean
  320. */
  321. public function clear(){
  322. if(!$this->getLock()) return false;
  323. for($i=0;$i @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);
  324. @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
  325. }
  326. $this->unLock();
  327. $this->resetSide('A');
  328. $this->resetSide('B');
  329. return true;
  330. }
  331. /*
  332. * 清除所有memcache缓存数据
  333. * @return NULL
  334. */
  335. public function memFlush(){
  336. memcache_flush(self::$client);
  337. }
  338. }
复制代码

php, memcached


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn