Maison > Article > développement back-end > Explication détaillée des exemples de code de verrouillage distribué et de file d'attente de tâches PHP Redis
1. Redis implémente des idées de verrouillage distribué
L'idée est très simple. La principale fonction Redis utilisée est setnx(), qui doit être utilisée pour implémenter. serrures distribuées. La fonction la plus importante du verrou. La première consiste à stocker un certain nom d'identification de tâche (ici Lock:order est utilisé comme exemple de nom d'identification) comme clé dans Redis et à définir un délai d'expiration pour celui-ci. S'il y a une autre demande Lock:order, passez d'abord. setnx() Voyez si Lock:order peut être inséré dans redis. Si c'est le cas, il retournera vrai, sinon, il retournera faux. Bien sûr, mon code sera plus compliqué que cette idée, et je l'expliquerai plus en détail lors de l'analyse du code.
2. Redis implémente la file d'attente des tâches
L'implémentation ici utilisera le mécanisme de verrouillage distribué Redis ci-dessus, utilisant principalement Redis La structure de données d'un ensemble commandé. Par exemple, lorsque vous rejoignez la file d'attente, utilisez la fonction add() de zset pour rejoindre la file d'attente, et lorsque vous quittez la file d'attente, vous pouvez utiliser la fonction getScore() de zset. De plus, plusieurs tâches en haut peuvent apparaître.
3. Analyse du code
(1) Analysons d'abord l'implémentation du code du verrouillage distribué Redis
(1) Afin d'éviter que le verrou ne puisse pas être libéré pour des raisons particulières, une fois le verrou verrouillé avec succès, le verrou se verra attribuer un temps de survie (via le paramétrage de la méthode de verrouillage ou en utilisant la valeur par défaut ). Si le temps de survie est dépassé, le verrou sera libéré automatiquement. La durée de vie du verrou est courte par défaut (secondes). Par conséquent, si vous devez verrouiller pendant une longue période, vous pouvez utiliser l'expiration. méthode pour prolonger la durée de vie du verrou jusqu'à une durée appropriée, comme dans une boucle.
(2) Verrous au niveau du système. Lorsque le processus plante pour une raison quelconque, le système d'exploitation recyclera les verrous par lui-même, il n'y aura donc pas de perte de ressources, mais les verrous distribués ne seront pas utilisés. -le réglage du temps est très long. Une fois qu'un crash de processus ou une autre exception se produit pour diverses raisons et que le déverrouillage n'est pas appelé, le verrou deviendra un verrou poubelle dans le temps restant, empêchant d'autres processus ou processus d'entrer dans le verrouillé. zone après le redémarrage.
Examinons d'abord le code d'implémentation du verrouillage : deux paramètres principaux sont nécessaires ici, l'un est $timeout, qui est le temps d'attente pour acquérir le verrou de manière cyclique. Pendant ce temps, il continuera à essayer d'acquérir le verrou. jusqu'à ce qu'il expire. S'il vaut 0 , cela signifie revenir directement après avoir échoué à acquérir le verrou sans attendre ; un autre paramètre important est $expire, ce paramètre fait référence à la durée de survie maximale du verrou actuel, en secondes, elle doit être supérieure. supérieur à 0, si le temps de survie est dépassé et que le verrou n'a pas été déverrouillé, le système forcera automatiquement le déverrouillage. Veuillez consulter l'explication en (1) ci-dessus pour la fonction la plus importante de ce paramètre.
Ici, nous obtenons d'abord l'heure actuelle, puis le délai d'attente lorsque le verrou échoue (il s'agit d'un horodatage), puis obtenons le temps de survie maximal du verrou. La clé de redis utilise ici ce format : "Lock : nom d'identification du verrou". Tout d'abord, insérez les données dans redis, et utilisez la fonction setnx(). Autrement dit, si la clé n'existe pas, insérez les données et stockez la durée de survie maximale sous forme de valeur. Si l'insertion réussit, définissez le délai d'expiration de la clé et placez la clé dans le tableau $lockedName, ce qui signifie. le verrouillage est réussi ; Si la clé existe, l'opération d'insertion ne sera pas effectuée ici, qui consiste à obtenir le temps restant de la clé actuelle. Si ce temps est inférieur à 0, cela signifie qu'il y a. aucun temps de survie n'est défini sur la clé (la clé n'existera pas), car le setnx précédent la créera automatiquement) Si cette situation se produit, c'est qu'une certaine instance du processus se bloque après la réussite de setnx, ce qui entraîne l'expiration ultérieure non étant appelé, vous pouvez directement définir l'expiration et utiliser le verrou pour votre propre usage. Si le temps d'attente en cas d'échec du verrouillage n'est pas défini ou si le temps d'attente maximum a été dépassé, quittez la boucle. Sinon, continuez la requête après $waitIntervalUs. Il s’agit de l’analyse complète du code du verrouillage.
/** * 加锁 * @param [type] $name 锁的标识名 * @param integer $timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待 * @param integer $expire 当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放 * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒) * @return [type] [description] */ public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { if ($name == null) return false; //取得当前时间 $now = time(); //获取锁失败时的等待超时时刻 $timeoutAt = $now + $timeout; //锁的最大生存时刻 $expireAt = $now + $expire; $redisKey = "Lock:{$name}"; while (true) { //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放 $result = $this->redisString->setnx($redisKey, $expireAt); if ($result != false) { //设置key的失效时间 $this->redisString->expire($redisKey, $expireAt); //将锁标志放到lockedNames数组里 $this->lockedNames[$name] = $expireAt; return true; } //以秒为单位,返回给定key的剩余生存时间 $ttl = $this->redisString->ttl($redisKey); //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建) //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用 //这时可以直接设置expire并把锁纳为己用 if ($ttl < 0) { $this->redisString->set($redisKey, $expireAt); $this->lockedNames[$name] = $expireAt; return true; } /*****循环请求锁部分*****/ //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出 if ($timeout <= 0 || $timeoutAt < microtime(true)) break; //隔 $waitIntervalUs 后继续 请求 usleep($waitIntervalUs); } return false; }
Ensuite, regardons l'analyse du code de déverrouillage : Le déverrouillage est beaucoup plus simple. Le paramètre entrant est l'identifiant du verrou. Tout d'abord, déterminez si le verrou existe. S'il existe, supprimez-le de Redis. fonction deleteKey(). Verrouillez simplement le logo.
/** * 解锁 * @param [type] $name [description] * @return [type] [description] */ public function unlock($name) { //先判断是否存在此锁 if ($this->isLocking($name)) { //删除锁 if ($this->redisString->deleteKey("Lock:$name")) { //清掉lockedNames里的锁标志 unset($this->lockedNames[$name]); return true; } } return false; } 在贴上删除掉所有锁的方法,其实都一个样,多了个循环遍历而已。 /** * 释放当前所有获得的锁 * @return [type] [description] */ public function unlockAll() { //此标志是用来标志是否释放所有锁成功 $allSuccess = true; foreach ($this->lockedNames as $name => $expireAt) { if (false === $this->unlock($name)) { $allSuccess = false; } } return $allSuccess; }
Ce qui précède est un résumé et un partage de l'ensemble des idées et des implémentations de code liées à l'utilisation de Redis pour implémenter des verrous distribués. Ici, je joins le code d'une classe d'implémentation. Dans le code, j'effectue essentiellement. a Des annotations sont fournies afin que chacun puisse comprendre et simuler rapidement l'application. Si vous souhaitez en savoir plus, veuillez consulter le code de la classe entière :
/** *在redis上实现分布式锁 */ class RedisLock { private $redisString; private $lockedNames = []; public function construct($param = NULL) { $this->redisString = RedisFactory::get($param)->string; } /** * 加锁 * @param [type] $name 锁的标识名 * @param integer $timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待 * @param integer $expire 当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放 * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒) * @return [type] [description] */ public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { if ($name == null) return false; //取得当前时间 $now = time(); //获取锁失败时的等待超时时刻 $timeoutAt = $now + $timeout; //锁的最大生存时刻 $expireAt = $now + $expire; $redisKey = "Lock:{$name}"; while (true) { //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放 $result = $this->redisString->setnx($redisKey, $expireAt); if ($result != false) { //设置key的失效时间 $this->redisString->expire($redisKey, $expireAt); //将锁标志放到lockedNames数组里 $this->lockedNames[$name] = $expireAt; return true; } //以秒为单位,返回给定key的剩余生存时间 $ttl = $this->redisString->ttl($redisKey); //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建) //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用 //这时可以直接设置expire并把锁纳为己用 if ($ttl < 0) { $this->redisString->set($redisKey, $expireAt); $this->lockedNames[$name] = $expireAt; return true; } /*****循环请求锁部分*****/ //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出 if ($timeout <= 0 || $timeoutAt < microtime(true)) break; //隔 $waitIntervalUs 后继续 请求 usleep($waitIntervalUs); } return false; } /** * 解锁 * @param [type] $name [description] * @return [type] [description] */ public function unlock($name) { //先判断是否存在此锁 if ($this->isLocking($name)) { //删除锁 if ($this->redisString->deleteKey("Lock:$name")) { //清掉lockedNames里的锁标志 unset($this->lockedNames[$name]); return true; } } return false; } /** * 释放当前所有获得的锁 * @return [type] [description] */ public function unlockAll() { //此标志是用来标志是否释放所有锁成功 $allSuccess = true; foreach ($this->lockedNames as $name => $expireAt) { if (false === $this->unlock($name)) { $allSuccess = false; } } return $allSuccess; } /** * 给当前所增加指定生存时间,必须大于0 * @param [type] $name [description] * @return [type] [description] */ public function expire($name, $expire) { //先判断是否存在该锁 if ($this->isLocking($name)) { //所指定的生存时间必须大于0 $expire = max($expire, 1); //增加锁生存时间 if ($this->redisString->expire("Lock:$name", $expire)) { return true; } } return false; } /** * 判断当前是否拥有指定名字的所 * @param [type] $name [description] * @return boolean [description] */ public function isLocking($name) { //先看lonkedName[$name]是否存在该锁标志名 if (isset($this->lockedNames[$name])) { //从redis返回该锁的生存时间 return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name"); } return false; } }
(2) Analyse du code de l'utilisation de Redis pour implémenter la file d'attente des tâches
(1) File d'attente de tâches, utilisée pour placer les opérations qui peuvent être traitées de manière asynchrone dans la logique métier dans la file d'attente, puis les retirer de la file d'attente après traitement dans d'autres threads
(2) Verrous distribués et d'autres logiques sont utilisées dans la file d'attente, Assurer la cohérence entre rejoindre et quitter l'équipe
(3)这个队列和普通队列不一样,入队时的id是用来区分重复入队的,队列里面只会有一条记录,同一个id后入的覆盖前入的,而不是追加, 如果需求要求重复入队当做不用的任务,请使用不同的id区分
先看入队的代码分析:首先当然是对参数的合法性检测,接着就用到上面加锁机制的内容了,就是开始加锁,入队时我这里选择当前时间戳作为score,接着就是入队了,使用的是zset数据结构的add()方法,入队完成后,就对该任务解锁,即完成了一个入队的操作。
/** * 入队一个 Task * @param [type] $name 队列名称 * @param [type] $id 任务id(或者其数组) * @param integer $timeout 入队超时时间(秒) * @param integer $afterInterval [description] * @return [type] [description] */ public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) { //合法性检测 if (empty($name) || empty($id) || $timeout <= 0) return false; //加锁 if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) { Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id"); return false; } //入队时以当前时间戳作为 score $score = microtime(true) + $afterInterval; //入队 foreach ((array)$id as $item) { //先判断下是否已经存在该id了 if (false === $this->_redis->zset->getScore("Queue:$name", $item)) { $this->_redis->zset->add("Queue:$name", $score, $item); } } //解锁 $this->_redis->lock->unlock("Queue:$name"); return true; }
接着来看一下出队的代码分析:出队一个Task,需要指定它的$id 和 $score,如果$score与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理。首先和对参数进行合法性检测,接着又用到加锁的功能了,然后及时出队了,先使用getScore()从Redis里获取到该id的score,然后将传入的$score和Redis里存储的score进行对比,如果两者相等就进行出队操作,也就是使用zset里的delete()方法删掉该任务id,最后当前就是解锁了。这就是出队的代码分析。
/** * 出队一个Task,需要指定$id 和 $score * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理 * * @param [type] $name 队列名称 * @param [type] $id 任务标识 * @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队 * @param integer $timeout 超时时间(秒) * @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过) */ public function dequeue($name, $id, $score, $timeout = 10) { //合法性检测 if (empty($name) || empty($id) || empty($score)) return false; //加锁 if (!$this->_redis->lock->lock("Queue:$name", $timeout)) { Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id"); return false; } //出队 //先取出redis的score $serverScore = $this->_redis->zset->getScore("Queue:$name", $id); $result = false; //先判断传进来的score和redis的score是否是一样 if ($serverScore == $score) { //删掉该$id $result = (float)$this->_redis->zset->delete("Queue:$name", $id); if ($result == false) { Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id"); } } //解锁 $this->_redis->lock->unlock("Queue:$name"); return $result; }
学过数据结构这门课的朋友都应该知道,队列操作还有弹出顶部某个值的方法等等,这里处理入队出队操作
/** * 获取队列顶部若干个Task 并将其出队 * @param [type] $name 队列名称 * @param integer $count 数量 * @param integer $timeout 超时时间 * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] */ public function pop($name, $count = 1, $timeout = 10) { //合法性检测 if (empty($name) || $count <= 0) return []; //加锁 if (!$this->_redis->lock->lock("Queue:$name")) { Log::get('queue')->error("pop faild because of pop failure: name = $name, count = $count"); return false; } //取出若干的Task $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); //将其放在$result数组里 并 删除掉redis对应的id foreach ($array as $id => $score) { $result[] = ['id'=>$id, 'score'=>$score]; $this->_redis->zset->delete("Queue:$name", $id); } //解锁 $this->_redis->lock->unlock("Queue:$name"); return $count == 1 ? (empty($result) ? false : $result[0]) : $result; }
以上就是用Redis实现任务队列的整一套思路和代码实现的总结和分享
/** * 任务队列 * */ class RedisQueue { private $_redis; public function construct($param = null) { $this->_redis = RedisFactory::get($param); } /** * 入队一个 Task * @param [type] $name 队列名称 * @param [type] $id 任务id(或者其数组) * @param integer $timeout 入队超时时间(秒) * @param integer $afterInterval [description] * @return [type] [description] */ public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) { //合法性检测 if (empty($name) || empty($id) || $timeout <= 0) return false; //加锁 if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) { Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id"); return false; } //入队时以当前时间戳作为 score $score = microtime(true) + $afterInterval; //入队 foreach ((array)$id as $item) { //先判断下是否已经存在该id了 if (false === $this->_redis->zset->getScore("Queue:$name", $item)) { $this->_redis->zset->add("Queue:$name", $score, $item); } } //解锁 $this->_redis->lock->unlock("Queue:$name"); return true; } /** * 出队一个Task,需要指定$id 和 $score * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理 * * @param [type] $name 队列名称 * @param [type] $id 任务标识 * @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队 * @param integer $timeout 超时时间(秒) * @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过) */ public function dequeue($name, $id, $score, $timeout = 10) { //合法性检测 if (empty($name) || empty($id) || empty($score)) return false; //加锁 if (!$this->_redis->lock->lock("Queue:$name", $timeout)) { Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id"); return false; } //出队 //先取出redis的score $serverScore = $this->_redis->zset->getScore("Queue:$name", $id); $result = false; //先判断传进来的score和redis的score是否是一样 if ($serverScore == $score) { //删掉该$id $result = (float)$this->_redis->zset->delete("Queue:$name", $id); if ($result == false) { Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id"); } } //解锁 $this->_redis->lock->unlock("Queue:$name"); return $result; } /** * 获取队列顶部若干个Task 并将其出队 * @param [type] $name 队列名称 * @param integer $count 数量 * @param integer $timeout 超时时间 * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] */ public function pop($name, $count = 1, $timeout = 10) { //合法性检测 if (empty($name) || $count <= 0) return []; //加锁 if (!$this->_redis->lock->lock("Queue:$name")) { Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count"); return false; } //取出若干的Task $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); //将其放在$result数组里 并 删除掉redis对应的id foreach ($array as $id => $score) { $result[] = ['id'=>$id, 'score'=>$score]; $this->_redis->zset->delete("Queue:$name", $id); } //解锁 $this->_redis->lock->unlock("Queue:$name"); return $count == 1 ? (empty($result) ? false : $result[0]) : $result; } /** * 获取队列顶部的若干个Task * @param [type] $name 队列名称 * @param integer $count 数量 * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] */ public function top($name, $count = 1) { //合法性检测 if (empty($name) || $count < 1) return []; //取错若干个Task $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); //将Task存放在数组里 foreach ($array as $id => $score) { $result[] = ['id'=>$id, 'score'=>$score]; } //返回数组 return $count == 1 ? (empty($result) ? false : $result[0]) : $result; } }
到此,这两大块功能基本讲解完毕,对于任务队列,你可以写一个shell脚本,让服务器定时运行某些程序,实现入队出队等操作,这里我就不在将其与实际应用结合起来去实现了,大家理解好这两大功能的实现思路即可,由于代码用的是PHP语言来写的,如果你理解了实现思路,你完全可以使用java或者是.net等等其他语言去实现这两个功能。这两大功能的应用场景十分多,特别是秒杀,另一个就是春运抢火车票,这两个是最鲜明的例子了。当然还有很多地方用到,这里我不再一一列举。
附上分布式锁和任务队列这两个类:
/** *在redis上实现分布式锁 */ class RedisLock { private $redisString; private $lockedNames = []; public function construct($param = NULL) { $this->redisString = RedisFactory::get($param)->string; } /** * 加锁 * @param [type] $name 锁的标识名 * @param integer $timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待 * @param integer $expire 当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放 * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒) * @return [type] [description] */ public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { if ($name == null) return false; //取得当前时间 $now = time(); //获取锁失败时的等待超时时刻 $timeoutAt = $now + $timeout; //锁的最大生存时刻 $expireAt = $now + $expire; $redisKey = "Lock:{$name}"; while (true) { //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放 $result = $this->redisString->setnx($redisKey, $expireAt); if ($result != false) { //设置key的失效时间 $this->redisString->expire($redisKey, $expireAt); //将锁标志放到lockedNames数组里 $this->lockedNames[$name] = $expireAt; return true; } //以秒为单位,返回给定key的剩余生存时间 $ttl = $this->redisString->ttl($redisKey); //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建) //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用 //这时可以直接设置expire并把锁纳为己用 if ($ttl < 0) { $this->redisString->set($redisKey, $expireAt); $this->lockedNames[$name] = $expireAt; return true; } /*****循环请求锁部分*****/ //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出 if ($timeout <= 0 || $timeoutAt < microtime(true)) break; //隔 $waitIntervalUs 后继续 请求 usleep($waitIntervalUs); } return false; } /** * 解锁 * @param [type] $name [description] * @return [type] [description] */ public function unlock($name) { //先判断是否存在此锁 if ($this->isLocking($name)) { //删除锁 if ($this->redisString->deleteKey("Lock:$name")) { //清掉lockedNames里的锁标志 unset($this->lockedNames[$name]); return true; } } return false; } /** * 释放当前所有获得的锁 * @return [type] [description] */ public function unlockAll() { //此标志是用来标志是否释放所有锁成功 $allSuccess = true; foreach ($this->lockedNames as $name => $expireAt) { if (false === $this->unlock($name)) { $allSuccess = false; } } return $allSuccess; } /** * 给当前所增加指定生存时间,必须大于0 * @param [type] $name [description] * @return [type] [description] */ public function expire($name, $expire) { //先判断是否存在该锁 if ($this->isLocking($name)) { //所指定的生存时间必须大于0 $expire = max($expire, 1); //增加锁生存时间 if ($this->redisString->expire("Lock:$name", $expire)) { return true; } } return false; } /** * 判断当前是否拥有指定名字的所 * @param [type] $name [description] * @return boolean [description] */ public function isLocking($name) { //先看lonkedName[$name]是否存在该锁标志名 if (isset($this->lockedNames[$name])) { //从redis返回该锁的生存时间 return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name"); } return false; } } /** * 任务队列 */ class RedisQueue { private $_redis; public function construct($param = null) { $this->_redis = RedisFactory::get($param); } /** * 入队一个 Task * @param [type] $name 队列名称 * @param [type] $id 任务id(或者其数组) * @param integer $timeout 入队超时时间(秒) * @param integer $afterInterval [description] * @return [type] [description] */ public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) { //合法性检测 if (empty($name) || empty($id) || $timeout <= 0) return false; //加锁 if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) { Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id"); return false; } //入队时以当前时间戳作为 score $score = microtime(true) + $afterInterval; //入队 foreach ((array)$id as $item) { //先判断下是否已经存在该id了 if (false === $this->_redis->zset->getScore("Queue:$name", $item)) { $this->_redis->zset->add("Queue:$name", $score, $item); } } //解锁 $this->_redis->lock->unlock("Queue:$name"); return true; } /** * 出队一个Task,需要指定$id 和 $score * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理 * * @param [type] $name 队列名称 * @param [type] $id 任务标识 * @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队 * @param integer $timeout 超时时间(秒) * @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过) */ public function dequeue($name, $id, $score, $timeout = 10) { //合法性检测 if (empty($name) || empty($id) || empty($score)) return false; //加锁 if (!$this->_redis->lock->lock("Queue:$name", $timeout)) { Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id"); return false; } //出队 //先取出redis的score $serverScore = $this->_redis->zset->getScore("Queue:$name", $id); $result = false; //先判断传进来的score和redis的score是否是一样 if ($serverScore == $score) { //删掉该$id $result = (float)$this->_redis->zset->delete("Queue:$name", $id); if ($result == false) { Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id"); } } //解锁 $this->_redis->lock->unlock("Queue:$name"); return $result; } /** * 获取队列顶部若干个Task 并将其出队 * @param [type] $name 队列名称 * @param integer $count 数量 * @param integer $timeout 超时时间 * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] */ public function pop($name, $count = 1, $timeout = 10) { //合法性检测 if (empty($name) || $count <= 0) return []; //加锁 if (!$this->_redis->lock->lock("Queue:$name")) { Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count"); return false; } //取出若干的Task $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); //将其放在$result数组里 并 删除掉redis对应的id foreach ($array as $id => $score) { $result[] = ['id'=>$id, 'score'=>$score]; $this->_redis->zset->delete("Queue:$name", $id); } //解锁 $this->_redis->lock->unlock("Queue:$name"); return $count == 1 ? (empty($result) ? false : $result[0]) : $result; } /** * 获取队列顶部的若干个Task * @param [type] $name 队列名称 * @param integer $count 数量 * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] */ public function top($name, $count = 1) { //合法性检测 if (empty($name) || $count < 1) return []; //取错若干个Task $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); //将Task存放在数组里 foreach ($array as $id => $score) { $result[] = ['id'=>$id, 'score'=>$score]; } //返回数组 return $count == 1 ? (empty($result) ? false : $result[0]) : $result; } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!