首頁  >  文章  >  後端開發  >  php實作的memcached佇列類

php實作的memcached佇列類

WBOY
WBOY原創
2016-07-25 08:43:27882瀏覽
PHP、記憶體快取
  1. /*
  2. * memcache佇列類別
  3. * 支援多重行程並發寫入、讀取
  4. * 邊寫邊讀,AB面輪值替換
  5. * @author guoyu
  6. * @create on 9:25 2014-9-28
  7. * @qq科技業交流群:136112330
  8. *
  9. * @example:
  10. * $obj = new memcacheQueue('duilie');
  11. * $obj->add('1asdf');
  12. * $obj->getQueueLength();
  13. * $obj->read( 11);
  14. * $obj->get(8);
  15. */
  16. class memcacheQueue{
  17. public static $client; //memcache客戶端連線
  18. public $access ; //佇列是否可更新
  19. private $currentSide; //目前輪值的佇列面:A/B
  20. private $lastSide; //上一輪值的佇列面:A/B
  21. private $sideAHead ; //A面隊首值
  22. private $sideATail; //A面隊尾值
  23. private $sideBHead; //B面隊首值
  24. private $sideBTail; //B面隊尾值
  25. private $currentHead; //目前隊首值
  26. private $currentTail; //目前隊尾值
  27. private $lastHead; //上輪隊首值
  28. private $lastTail; //上 $輪隊尾值
  29. private $expire; //過期時間,秒,1~2592000,即30天內;0為永不過期
  30. private $sleepTime; //等待解鎖時間,微秒
  31. private $queueName; //佇列名稱,唯一值
  32. private $retryNum; //重試次數,= 10 * 理論並發數
  33. const MAXNUM = 2000; //(單面)最大佇列數,建議上限10K
  34. const HEAD_KEY = '_lkkQueueHead_'; //佇列首kye
  35. const TAIL_KEY = '_lkkQueueTail_'; //佇列尾key
  36. constnLU_KEY = 'lkkQuue > const LOCK_KEY = '_lkkQueueLock_'; //佇列鎖定key
  37. const SIDE_KEY = '_lkkQueueSide_'; //輪值面key
  38. /*
  39. * 建構子
  40. **
  41. * 建構子
  42. *param ] array memcache伺服器參數
  43. * @param [queueName] string 佇列名稱
  44. * @param [expire] string 過期時間
  45. * @return NULL
  46. */
  47. public function __construct($queue ='',$expire='',$config =''){
  48. if(empty($config)){
  49. self::$client = memcache_pconnect('localhost',11211);
  50. }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
  51. self::$client = memcache_pconnect($config['host '],$config['port']);
  52. }elseif(is_string($config)){//"127.0.0.1:11211"
  53. $tmp = explode(':',$config);
  54. $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
  55. $conf['port'] = isset($tmp[1] ) ? $tmp[1] : '11211';
  56. self::$client = memcache_pconnect($conf['host'],$conf['port']);
  57. }
  58. if(! self::$client) return false;
  59. ignore_user_abort(TRUE);//當客戶斷開連接,允許繼續執行
  60. set_time_limit(0);//取消腳本執行延遲上限
  61. $this->access = false;
  62. $this->sleepTime = 1000;
  63. $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
  64. $this->expire = $expire;
  65. $this->queueName = $queueName;
  66. $this->retryNum = 10000;
  67. $side = memcache_add(self::$ client, $queueName . self::SIDE_KEY, 'A',false, $expire);
  68. $this->getHeadNTail($queueName);
  69. if(!isset($this->sideAHead) || empty ($this->sideAHead)) $this->sideAHead = 0;
  70. if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0; if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0; if(!isset($this->sideBHead) || empty( $this->sideBHead)) $this->sideBHead = 0; }
  71. /*
  72. * 取得佇列首尾值
  73. * @param [queueName] string 佇列名稱
  74. * @return NULL
  75. */
  76. private function getHeadNTail($queueue) {
  77. $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'.self::HEAD_KEY);
  78. $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'.self::TAIL_KEY);
  79. $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'.self::HEAD_KEY);
  80. $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'.self::TAIL_KEY);
  81. }
  82. /*
  83. * 取得目前輪值的佇列面
  84. * @return string 佇列面名稱
  85. */
  86. public function getCurrentSide(){
  87. (){
  88. $ currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  89. if($currentSide == 'A'){
  90. $this->currentSide = 'A';
  91. $this->lastSide = 'B';
  92. $this->currentHead = $this->sideAHead;
  93. $this->currentTail = $this->sideATail;
  94. $this->lastHead = $this->sideBHead;
  95. $this->lastTail = $this->sideBTail;
  96. }else{
  97. $this->currentSide = 'B';
  98. $this->lastSide = 'A';
  99. $this->currentHead = $this->sideBHead;
  100. $this->currentTail = $this->sideBTail;
  101. $this->lastHead = $this->sideAHead;
  102. $this->lastTail = $this->sideATail;
  103. }
  104. return $this->currentSide;
  105. }
  106. /*
  107. * 佇列加鎖
  108. * @return boolean
  109. */
  110. private function getLock(){
  111. if($this->;存取=== false){
  112. while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
  113. usleep ($這個->睡眠時間);
  114. @$i;
  115. if($i > $this->retryNum){//嘗試等待N次
  116. return false;
  117. 休息;
  118. }
  119. }
  120. return $this->access = true;
  121. }
  122. 回傳 false;
  123. }
  124. /*
  125. * 佇列解鎖
  126. * @return NULL
  127. */
  128. private function unLock({🎜> private function unLock({ 🎜> memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
  129. $this->access = false;
  130. }
  131. /*
  132. * 新增資料
  133. * @param [data] 要儲存的值
  134. * @return boolean
  135. */
  136. public function add( $data){
  137. $結果= false;
  138. if(!$this->getLock()){
  139. return $result;
  140. }
  141. $this->getHeadNTail($this->queueName);
  142. $this->getCurrentSide();
  143. if($this->isFull()){
  144. $this->unLock();
  145. 回傳 false;
  146. }
  147. if($this->currentTail $value_key = $this->queueName .$this->currentSide . self::VALU_KEY 。
  148. if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
  149. $this->changeTail();
  150. $結果 = true;
  151. }
  152. }else{//目前佇列已滿,換輪值面
  153. $this->unLock();
  154. $this->changeCurrentSide();
  155. return $this->add($data);
  156. }
  157. $this->unLock();
  158. 回傳$結果;
  159. }
  160. /*
  161. * 資料擷取
  162. * @param [length] int 資料的長度
  163. * @return array
  164. */
  165. public function get( $length=0){
  166. if(!is_numeric($length)) return false;
  167. if(empty($length)) $length = self::MAXNUM * 2;//預設讀取所有
  168. if(!$this->getLock()) return false;
  169. if($this->isEmpty()){
  170. $this->unLock();
  171. 回傳 false;
  172. }
  173. $keyArray = $this->getKeyArray($length);
  174. $lastKey = $keyArray['lastKey'];
  175. $currentKey = $keyArray['currentKey'];
  176. $keys = $keyArray['keys'];
  177. $this->changeHead($this->lastSide,$lastKey);
  178. $this->changeHead($this->currentSide,$currentKey);
  179. $data = @memcache_get(self::$client, $keys);
  180. foreach($keys as $v){//取出後刪除
  181. @memcache_delete(self::$client,self::$client, $v, 0);
  182. }
  183. $this->unLock();
  184. 回傳$data;
  185. }
  186. /*
  187. * 讀取資料
  188. * @param [length] int 資料的長度
  189. * @return array
  190. */
  191. public function read ($length=0){
  192. if(!is_numeric($length)) return false;
  193. if(empty($length)) $length = self::MAXNUM * 2;//預設讀取所有
  194. $keyArray = $this->getKeyArray($length);
  195. $ data = @memcache_get(self::$client, $keyArray['keys']);
  196. 傳回$資料;
  197. }
  198. /*
  199. * 取得佇列某段長度的密實取得佇列某段長度的密實取得佇列某段長度的密實取得佇列某段長度的密實取得佇列某段長度鑰數組
  200. * @param [length] int 佇列長度
  201. * @return array
  202. */
  203. private function getKeyArray($length){
  204. $result = array('keys'=> array(),'lastKey'=>array(),'currentKey'=>array());
  205. $this->getHeadNTail($this->queueName);
  206. $this->getCurrentSide();
  207. if(empty($length)) return $結果;
  208. // 先取上邊的key
  209. $i = $result['lastKey'] = 0;
  210. for($i =0;$i $result['lastKey'] = $this->lastHead $i;
  211. if($result['lastKey'] >= $this-> lastTail) 中斷;
  212. $result['keys'][] = $this->queueName .$this->lastSide 。 self::VALU_KEY 。 $結果['lastKey'];
  213. }
  214. // 再取當前面的key
  215. $j = $length - $i;
  216. $k = $result['currentKey' ] = 0;
  217. for($k=0;$k $result['currentKey'] = $this->currentHead $k;
  218. if($result ['currentKey'] >= $this->currentTail) 中斷;
  219. $result['keys'][] = $this->queueName .$this->currentSide 。 self::VALU_KEY 。 $結果['當前Key'];
  220. }
  221. 回傳$結果;
  222. }
  223. /*
  224. *更新目前輪值面隊列尾的值
  225. * @return NULL
  226. */
  227. private function changeTail(){
  228. $tail_key = $this->queueName .$this->currentSide 。自我::TAIL_KEY;
  229. memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果沒有,則插入;有則false;
  230. //memcache_increment(self ::$client, $tail_key, 1);//佇列尾1
  231. $v = memcache_get(self::$client, $tail_key) 1;
  232. memcache_set(self::$client, $tail_key,$ v,false,$this->expire);
  233. }
  234. /*
  235. * 更新佇列首的值
  236. * @param [side] string 要更新的面
  237. * @param [headValue] int 佇列首的值
  238. * @return NULL
  239. */
  240. private function changeHead($side,$headValue){
  241. if($headValue $head_key = $this->queueName .$side 。自我::HEAD_KEY;
  242. $tail_key = $this->queueName .$side 。自我::TAIL_KEY;
  243. $sideTail = memcache_get(self::$client, $tail_key);
  244. if($headValue memcache_set(self::$client, $head_key,$ headValue 1,false,$this->expire);
  245. }elseif($headValue >= $sideTail){
  246. $this->resetSide($side);
  247. }
  248. }
  249. }
  250. /*
  251. * 重置隊列面,即將該隊列面的隊頭、隊尾值要置為0
  252. * @param [side] string重置的面
  253. * @return NULL
  254. */
  255. private function resetSide($side){
  256. $head_key = $this->queueName .$side .自我::HEAD_KEY;
  257. $tail_key = $this->queueName .$side 。自我::TAIL_KEY;
  258. memcache_set(self::$client, $head_key,0,false,$this->expire);
  259. memcache_set(self::$client, $tail_key,0,false,$this ->expire); }
  260. /*
  261. * 改變目前輪值佇列面
  262. * @return string
  263. */
  264. private function changeCurrentSide(){
  265. $currentSget客戶端, $this->queueName 。 ,'B',false,$this->expire) ;
  266. $this->currentSide = 'B';
  267. }else{
  268. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
  269. $this->currentSide = 'A';
  270. }
  271. return $this->currentSide;
  272. }
  273. /*
  274. * 檢查目前佇列是否已滿
  275. * @return boolean
  276. */
  277. public function isFull(){
  278. $result = false ;
  279. ifif ($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
  280. $result = true;
  281. }
  282. 回傳$結果;
  283. }
  284. /*
  285. * 檢查目前佇列是否為空
  286. * @return boolean
  287. */
  288. public function isEmpty(){
  289. $result = true ;
  290. true ; ($this->sideATail > 0 || $this->sideBTail > 0){
  291. $result = false;
  292. }
  293. 回傳$結果;
  294. }
  295. /*
  296. * 取得目前佇列的長度
  297. * 長度為理論長度,某些元素因過渡失效而遺失,真實長度小於或等於該長度
  298. * @return int
  299. */
  300. public function getQueueLength(){
  301. $this->getHeadNTail($this->queueName);
  302. $this->getCurrentSide();
  303. $sideALength = $this->ATail - $this- >sideAHead;
  304. $sideBLength = $this->sideBTail - $this->sideBHead;
  305. $結果= $sideALength $sideBLength;
  306. 回傳$結果;
  307. }
  308. /*
  309. * 清空目前佇列數據,只保留HEAD_KEY、TAIL_KEY、SIDE_KEY三個鍵
  310. * @return boolean
  311. */
  312. public function clear( ){
  313. if(! $this->getLock()) return false;
  314. for($i=0;$i<:maxnum> @memcache_delete(self::$client, $this->queueName .'A'. self::VALU_KEY 。 $i, 0);
  315. @memcache_delete(self::$client, $this->queueName.'B'.self::VALU_KEY .$i, 0);
  316. }
  317. $this-> unLock();
  318. $this->resetSide('A');
  319. $this->resetSide('B');
  320. 回傳true;
  321. }
  322. /*
  323. * 清除所有memcache伺服器資料
  324. * @return NULL
  325. */
  326. public function memFlush(){
  327. memcache_flush(self:: $客戶);
  328. }
  329. }
複製程式碼

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn