PHP memcache 環形佇列類別。新手,沒咋學過資料結構,因為業務需要,所以只是硬著頭皮模擬的! 原形是 oschina上 lusi 分享的PHP memcache 佇列程式碼。為使隊列隨時可入可出,且不受int長度越界危險(單鏈採取Head自增的話不作處理有越界可能),所以索性改寫成環形隊列。可能還有BUG,忘見諒!
- /**
- * PHP memcache 環形隊列類
- * 原作者LKK/lianq.net
- * 修改FoxHunter
- * 因業務需要只保留的隊列中的Pop和Push,修改過期時間為0即永久
- */
- class MQueue
- {
- public static $client;
-
- private $expire; //過期時間,秒,1~2592000,即30天內
- private $sleepTime; //等待解鎖時間,微秒
- private $queueName; //隊列名稱,唯一值
- private $retryNum; //嘗試次數
- private $MAXNUM; //最大隊列容量
- private $canRewrite; //是否可以覆蓋開關,滿出來的內容從頭部開始覆蓋重寫原來的資料
-
- private $HEAD; //下一步要進入的指標位置
- private $TAIL; //下一步要進入的指標位置
- private $LEN; //佇列現有長度
-
- const LOCK_KEY = '_Fox_MQ_LOCK_'; //鎖定儲存標示
- const LENGTH_KEY = '_Fox_MQ_LENGTH_'; //佇列長度儲存標示
- constst //值佇列值 =標示
- const HEAD_KEY = '_Fox_MQ_HEAD_'; //佇列HEAD指標位置標示
- const TAIL_KEY = '_Fox_MQ_TAIL_'; //TAILIL_1,
-
-
- * 程式碼> 🎜> * 對於同一個$queueName,實例化時必須保障建構子的參數值一致,否則pop和push會導佇列順序混亂
- */
- public function __construct($queueName = '', $maxqueue = 1, $canRewrite = false, $expire = 0, $config = '')
- {
- if (empty($config)) {
- self::$client = memcache_pconnect('127.0.0.1 ', 11211);
- } elseif (is_array($config)) { //array('host'=>'127.0.0.1','port'=>'11211')
- self::$client = memcache_pconnect($config['host'], $config['port']);
- } elseif (is_string($config)) { //"127.0.0.1:11211"
- $tmp = explode( ':', $config);
- $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
- $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
- self::$client = memcache_pconnect($conf['host'], $conf['port']);
- }
- if (!self::$client)
- return false;
- ignore_user_abort(true); //當客戶斷開連接,允許繼續執行
- set_time_limit(0); //取消腳本執行延遲上限
- $this->access = false;
- $this->sleepTime = 1000;
- $expire = (empty($expire)) ? 0 : (int) $expire 1;
- $this->expire = $expire;
- $this->queueName = $queueName;
- $this->retryNum = 20000;
- $this->MAXNUM = $maxqueue != null ? $maxqueue : 1;
- $this->canRewrite = $canRewrite;
- $this->getHeadAndTail();
- if (!isset($this->HEAD) || empty($this-> HEAD))
- $this->HEAD = 0;
- if (!isset($this->TAIL) || empty($this->TAIL))
- $this->TAIL = 0;
- if (!isset($this->LEN) || empty($this->LEN))
- $this->LEN = 0;
-
- }
-
- / /取得佇列首尾指標資訊與長度
- private function getHeadAndTail()
- {
- $this->HEAD = (int) memcache_get(self::$client, $this->queueName . self::HEmemcache_get(self::$client, $this->queueName . self::HEAD_KEY );
- $this->TAIL = (int) memcache_get(self::$client, $this->queueName . self::TAIL_KEY);
- $this->LEN = (int) memcache_get(self: :$client, $this->queueName . self::LENGTH_KEY);
- }
-
-
- // 利用memcache_add原子性加鎖
- private function lock()
-
- {
- if ($this->access === false) {
- $i = 0;
- while (!memcache_add(self::$client, $this->queueName . self::LOCK_KEY, 1, false, $this->expire)) {
- usleep($this->sleepTime);
- @$i ;
- if ($i > $this->retryNum) { //嘗試等待N次
- return false;
- break;
- }
- }
- return $this->access = true;
- }
- return false;
- }
- }
- return false;
- }
- }
- return false;
- }
- }
- return false; } } return false; } } return false; } } return false; } } return false; } } return false; }} > //更新頭部指標指向,指向下一個位置 private function incrHead() { //$this->getHeadAndTail(); //取得最新指標資訊,由於本方法體均在鎖內調用,其鎖內已調用了此方法,本行註釋 $this->HEAD ; //頭部指針下移 if ($this->HEAD >= $this->MAXNUM ) { $this->HEAD = 0; //邊界值修正 }
- ;
- $this->LEN--; //Head的移動由Pop觸發,所以相當於數量減少
- if ($this->LEN $this-> LEN = 0; //邊界值修正
- }
- ;
- memcache_set(self::$client, $this->queueName . self::HEAD_KEY, $this->HEAD, false, $this- >expire); //更新
- memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, $this->LEN, false, $this->expire); //更新
-
- }
-
- //更新尾部指標指向,指向下一個位置
- private function incrTail()
- {
-
- //$this->getHeadAndTail(); / /取得最新指針訊息,由於本方法體均在鎖內調用,其鎖內已調用了此方法,本行註釋
- $this->TAIL ; //尾部指針下移
- if ($this ->TAIL >= $this->MAXNUM) {
- $this->TAIL = 0; //邊界值修正
- }
- ;
- $this->LEN ; //Head的移動由Push觸發,所以相當於數量增加
- if ($this->LEN >= $this->MAXNUM) {
- $this->LEN = $this->MAXNUM; //邊界值長度修正
- }
- ;
- memcache_set(self::$client, $this->queueName . self::TAIL_KEY, $this->TAIL, false, $this->expire); //更新
- memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, $this->LEN, false, $this->expire); //更新
- }
-
-
- // 解鎖
- private function unLock()
- {
- memcache_delete(self::$client, $this->queueName . self::LOCK_KEY);
- $this->access = false;
- }
-
- //判斷是否滿隊列
- public function isFull()
- {
- //外部直接呼叫的時候由於沒有鎖定所以此處的值是個大概值,並不很準確,但內部呼叫由於在前面有lock,所以可信
- if ($this->canRewrite)
- return false;
- return $this->LEN == $this->MAXNUM ? true : false;
- }
-
- //判斷是否為空
- public function isEmpty()
- {
- //外部直接呼叫的時候由於沒有鎖定所以此處的值是個大概值,不太準確,但是內部呼叫由於在前面有lock,所以可信
- return $this->LEN == 0 ? true : false;
- }
-
- public function getLen()
- {
- //外部直接呼叫的時候由於沒有鎖所以此處的值是個大概值,並不很準確,但是內部呼叫由於在前面有lock,所以可信
- return $this->LEN;
- }
-
- /*
- * push值
- * @param mixed 值
- * @return bool
- */
- public function push ($data = '')
- {
-
- $result = false;
- if (empty($data))
- return $result;
-
- if (!$ this->lock()) {
- return $result;
- }
-
- $this->getHeadAndTail(); //取得最新指標資訊
-
- if ($this- >isFull()) { //只有在非覆寫下才有Full概念
- $this->unLock();
- return false;
- }
-
- if (memcache_set(self ::$client, $this->queueName . self::VALU_KEY . $this->TAIL, $data, MEMCACHE_COMPRESSED, $this->expire)) {
- //當推送後,發現尾部和頭部重疊(此時指針還未移動),且右邊仍有未由Head讀取的數據,那麼移動Head指針,避免尾部指針跨越Head
- if ($this->TAIL == $this->HEAD && $ this->LEN >= 1) {
- $this->incrHead();
- }
- $this->incrTail(); //移動尾部指針
- $result = true;
- }
-
- $this->unLock();
- return $result;
- }
-
-
- /*
- * Pop一個值
- * @ param [length] int 佇列長度
- * @return array
- */
- public function pop($length = 0)
- {
- if (!is_numeric($length))
- return false;
-
- if (!$this->lock())
- return false;
-
- $this->getHeadAndTail();
-
- if (empty( $length))
- $length = $this->LEN; //預設讀取所有
-
- if ($this->isEmpty()) {
- $this->unLock();
- return false;
- }
- //取得長度超出佇列長度後修正
- if ($length > $this->LEN)
- $length = $this->LEN;
-
- $data = $this->popKeyArray($length);
- $this->unLock();
- return $data;
- }
-
-
- /*
- * pop某段長度的值
- * @param [length] int 佇列長度
- * @return array
- */
- private ftion popKeyArray($length)
- {
-
- $result = array();
- if (empty($length))
- return $result;
- for ($k = 0; $k $result[] = @memcache_get(self::$client, $this->queueName . self::VALU_KEY . $this->HEAD);
- @memcache_delete (self::$client, $this->queueName . self::VALU_KEY . $this->HEAD, 0);
- //當提取值後,發現頭部和尾部重合(此時指針還未移動),且右邊沒有數據,即隊列中最後一個數據被完全掏空,此時指針停留在本地不移動,隊列長度變為0
- if ($this->TAIL == $this->HEAD && $this->LEN $this->LEN = 0;
- memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, $this->LEN, false , $this->expire); //更新
- break;
- } else {
- $this->incrHead(); //首尾未重合,或者重合但是仍有未讀取出的數據,皆移動HEAD指標到下一個待讀取位置
- }
- }
- return $result;
- }
-
- /*
- * 重設佇列
- * * @return NULL
- */
- private function reset($all = false)
- {
-
- if ($all) {
- memcache_delete(self::$client, $ this->queueName . self::HEAD_KEY, 0);
- memcache_delete(self::$client, $this->queueName . self::TAIL_KEY, 0);
- memcache_delete(self::$client, $f::$client, $ this->queueName . self::LENGTH_KEY, 0);
- } else {
- $this->HEAD = $this->TAIL = $this->LEN = 0;
- meacheache_set(selfmc:: $client, $this->queueName . self::HEAD_KEY, 0, false, $this->expire);
- memcache_set(self::$client, $this->queueName . self::TAIL_KEY, 0, false , $this->expire);
- memcache_set(self::$client, $this->queueName . self::LENGTH_KEY, 0, false, $this->expire);
- }
- }
-
- /*
- * 清除所有memcache快取資料
- * @return NULL
- */
- public function memFlush()
- {
- meachebdush$ );
- }
-
-
- public function clear($all = false)
- {
- if (!$this->lock())
- return false;
- $this->getHeadAndTail();
- $Head = $this->HEAD;
- $Length = $this->LEN;
- $curr = 0;
- for ($i = 0 ; $i $curr = $this->$Head $i;
- if ($curr >= $this->MAXNUM) {
- $this->HEAD = $curr = 0;
- }
- @memcache_delete(self::$client, $this->queueName . self::VALU_KEY . $curr, 0);
- }
-
-
- $this->unLock();
- $this->reset($all);
- return true;
- }
-
-
- }
複製程式碼
|