搜索
首页后端开发php教程php redis分布式锁和任务队列代码实例详解

1.Redis实现分布式锁思路

  思路很简单,主要用到的redis函数是setnx(),这个应该是实现分布式锁最主要的函数。首先是将某一任务标识名(这里用Lock:order作为标识名的例子)作为键存到redis里,并为其设个过期时间,如果是还有Lock:order请求过来,先是通过setnx()看看是否能将Lock:order插入到redis里,可以的话就返回true,不可以就返回false。当然,在我的代码里会比这个思路复杂一些,我会在分析代码时进一步说明。

2.Redis实现任务队列

  这里的实现会用到上面的Redis分布式的锁机制,主要是用到了Redis里的有序集合这一数据结构。例如入队时,通过zset的add()函数进行入队,而出对时,可以用到zset的getScore()函数。另外还可以弹出顶部的几个任务。

3. 代码分析

(一)先来分析Redis分布式锁的代码实现  

(1)为避免特殊原因导致锁无法释放,在加锁成功后,锁会被赋予一个生存时间(通过lock方法的参数设置或者使用默认值),超出生存时间锁会被自动释放锁的生存时间默认比较短(秒级),因此,若需要长时间加锁,可以通过expire方法延长锁的生存时间为适当时间,比如在循环内。

(2)系统级的锁当进程无论何种原因时出现crash时,操作系统会自己回收锁,所以不会出现资源丢失,但分布式锁不用,若一次性设置很长时间,一旦由于各种原因出现进程crash 或者其他异常导致unlock未被调用时,则该锁在剩下的时间就会变成垃圾锁,导致其他进程或者进程重启后无法进入加锁区域。

先看加锁的实现代码:这里需要主要两个参数,一个是$timeout,这个是循环获取锁的等待时间,在这个时间内会一直尝试获取锁知道超时,如果为0,则表示获取锁失败后直接返回而不再等待;另一个重要参数的$expire,这个参数指当前锁的最大生存时间,以秒为单位的,它必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放。这个参数的最要作用请看上面的(1)里的解释。

  这里先取得当前时间,然后再获取到锁失败时的等待超时的时刻(是个时间戳),再获取到锁的最大生存时刻是多少。这里redis的key用这种格式:"Lock:锁的标识名",这里就开始进入循环了,先是插入数据到redis里,使用setnx()函数,这函数的意思是,如果该键不存在则插入数据,将最大生存时刻作为值存储,假如插入成功,则对该键进行失效时间的设置,并将该键放在$lockedName数组里,返回true,也就是上锁成功;如果该键存在,则不会插入操作了,这里有一步严谨的操作,那就是取得当前键的剩余时间,假如这个时间小于0,表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用,这时可以直接设置expire并把锁纳为己用。如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出循环,反之则 隔 $waitIntervalUs 后继续 请求。  这就是加锁的整一个代码分析。

/**
   * 加锁
   * @param [type] $name      锁的标识名
   * @param integer $timeout    循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待
   * @param integer $expire     当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放
   * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)
   * @return [type]         [description]
   */
  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
    if ($name == null) return false;

    //取得当前时间
    $now = time();
    //获取锁失败时的等待超时时刻
    $timeoutAt = $now + $timeout;
    //锁的最大生存时刻
    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";
    while (true) {
      //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放
      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {
        //设置key的失效时间
        $this->redisString->expire($redisKey, $expireAt);
        //将锁标志放到lockedNames数组里
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      //以秒为单位,返回给定key的剩余生存时间
      $ttl = $this->redisString->ttl($redisKey);

      //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
      //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
      //这时可以直接设置expire并把锁纳为己用
      if ($ttl < 0) {
        $this->redisString->set($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      /*****循环请求锁部分*****/
      //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出
      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      //隔 $waitIntervalUs 后继续 请求
      usleep($waitIntervalUs);

    }

    return false;
  }

  接着看解锁的代码分析:解锁就简单多了,传入参数就是锁标识,先是判断是否存在该锁,存在的话,就从redis里面通过deleteKey()函数删除掉锁标识即可。

/**
   * 解锁
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function unlock($name) {
    //先判断是否存在此锁
    if ($this->isLocking($name)) {
      //删除锁
      if ($this->redisString->deleteKey("Lock:$name")) {
        //清掉lockedNames里的锁标志
        unset($this->lockedNames[$name]);
        return true;
      }
    }
    return false;
  }
    在贴上删除掉所有锁的方法,其实都一个样,多了个循环遍历而已。
/**
   * 释放当前所有获得的锁
   * @return [type] [description]
   */
  public function unlockAll() {
    //此标志是用来标志是否释放所有锁成功
    $allSuccess = true;
    foreach ($this->lockedNames as $name => $expireAt) {
      if (false === $this->unlock($name)) {
        $allSuccess = false;  
      }
    }
    return $allSuccess;
  }

  以上就是用Redis实现分布式锁的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能模拟应用。想要深入了解的请看整个类的代码:

/**
 *在redis上实现分布式锁
 */
class RedisLock {
  private $redisString;
  private $lockedNames = [];

  public function construct($param = NULL) {
    $this->redisString = RedisFactory::get($param)->string;
  }

  /**
   * 加锁
   * @param [type] $name      锁的标识名
   * @param integer $timeout    循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待
   * @param integer $expire     当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放
   * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)
   * @return [type]         [description]
   */
  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
    if ($name == null) return false;

    //取得当前时间
    $now = time();
    //获取锁失败时的等待超时时刻
    $timeoutAt = $now + $timeout;
    //锁的最大生存时刻
    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";
    while (true) {
      //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放
      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {
        //设置key的失效时间
        $this->redisString->expire($redisKey, $expireAt);
        //将锁标志放到lockedNames数组里
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      //以秒为单位,返回给定key的剩余生存时间
      $ttl = $this->redisString->ttl($redisKey);

      //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
      //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
      //这时可以直接设置expire并把锁纳为己用
      if ($ttl < 0) {
        $this->redisString->set($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      /*****循环请求锁部分*****/
      //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出
      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      //隔 $waitIntervalUs 后继续 请求
      usleep($waitIntervalUs);

    }

    return false;
  }

  /**
   * 解锁
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function unlock($name) {
    //先判断是否存在此锁
    if ($this->isLocking($name)) {
      //删除锁
      if ($this->redisString->deleteKey("Lock:$name")) {
        //清掉lockedNames里的锁标志
        unset($this->lockedNames[$name]);
        return true;
      }
    }
    return false;
  }

  /**
   * 释放当前所有获得的锁
   * @return [type] [description]
   */
  public function unlockAll() {
    //此标志是用来标志是否释放所有锁成功
    $allSuccess = true;
    foreach ($this->lockedNames as $name => $expireAt) {
      if (false === $this->unlock($name)) {
        $allSuccess = false;  
      }
    }
    return $allSuccess;
  }

  /**
   * 给当前所增加指定生存时间,必须大于0
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function expire($name, $expire) {
    //先判断是否存在该锁
    if ($this->isLocking($name)) {
      //所指定的生存时间必须大于0
      $expire = max($expire, 1);
      //增加锁生存时间
      if ($this->redisString->expire("Lock:$name", $expire)) {
        return true;
      }
    }
    return false;
  }

  /**
   * 判断当前是否拥有指定名字的所
   * @param [type] $name [description]
   * @return boolean    [description]
   */
  public function isLocking($name) {
    //先看lonkedName[$name]是否存在该锁标志名
    if (isset($this->lockedNames[$name])) {
      //从redis返回该锁的生存时间
      return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name");
    }

    return false;
  }

}

(二)用Redis实现任务队列的代码分析

(1)任务队列,用于将业务逻辑中可以异步处理的操作放入队列中,在其他线程中处理后出队

(2)队列中使用了分布式锁和其他逻辑,保证入队和出队的一致性

(3)这个队列和普通队列不一样,入队时的id是用来区分重复入队的,队列里面只会有一条记录,同一个id后入的覆盖前入的,而不是追加, 如果需求要求重复入队当做不用的任务,请使用不同的id区分

  先看入队的代码分析:首先当然是对参数的合法性检测,接着就用到上面加锁机制的内容了,就是开始加锁,入队时我这里选择当前时间戳作为score,接着就是入队了,使用的是zset数据结构的add()方法,入队完成后,就对该任务解锁,即完成了一个入队的操作。

/**
   * 入队一个 Task
   * @param [type] $name     队列名称
   * @param [type] $id      任务id(或者其数组)
   * @param integer $timeout    入队超时时间(秒)
   * @param integer $afterInterval [description]
   * @return [type]         [description]
   */
  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    //合法性检测
    if (empty($name) || empty($id) || $timeout <= 0) return false;

    //加锁
    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
      Logger::get(&#39;queue&#39;)->error("enqueue faild becouse of lock failure: name = $name, id = $id");
      return false;
    }
    
    //入队时以当前时间戳作为 score
    $score = microtime(true) + $afterInterval;
    //入队
    foreach ((array)$id as $item) {
      //先判断下是否已经存在该id了
      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
        $this->_redis->zset->add("Queue:$name", $score, $item);
      }
    }
    
    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  接着来看一下出队的代码分析:出队一个Task,需要指定它的$id 和 $score,如果$score与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理。首先和对参数进行合法性检测,接着又用到加锁的功能了,然后及时出队了,先使用getScore()从Redis里获取到该id的score,然后将传入的$score和Redis里存储的score进行对比,如果两者相等就进行出队操作,也就是使用zset里的delete()方法删掉该任务id,最后当前就是解锁了。这就是出队的代码分析。

/**
   * 出队一个Task,需要指定$id 和 $score
   * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
   * 
   * @param [type] $name  队列名称 
   * @param [type] $id   任务标识
   * @param [type] $score  任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
   * @param integer $timeout 超时时间(秒)
   * @return [type]      Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
   */
  public function dequeue($name, $id, $score, $timeout = 10) {
    //合法性检测
    if (empty($name) || empty($id) || empty($score)) return false;
    
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
      Logger:get(&#39;queue&#39;)->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
      return false;
    }
    
    //出队
    //先取出redis的score
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    //先判断传进来的score和redis的score是否是一样
    if ($serverScore == $score) {
      //删掉该$id
      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);
      if ($result == false) {
        Logger::get(&#39;queue&#39;)->error("dequeue faild because of redis delete failure: name =$name, id = $id");
      }
    }
    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return $result;
  }

  学过数据结构这门课的朋友都应该知道,队列操作还有弹出顶部某个值的方法等等,这里处理入队出队操作

/**
   * 获取队列顶部若干个Task 并将其出队
   * @param [type] $name  队列名称
   * @param integer $count  数量
   * @param integer $timeout 超时时间
   * @return [type]      返回数组[0=>[&#39;id&#39;=> , &#39;score&#39;=> ], 1=>[&#39;id&#39;=> , &#39;score&#39;=> ], 2=>[&#39;id&#39;=> , &#39;score&#39;=> ]]
   */
  public function pop($name, $count = 1, $timeout = 10) {
    //合法性检测
    if (empty($name) || $count <= 0) return []; 
    
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name")) {
      Log::get(&#39;queue&#39;)->error("pop faild because of pop failure: name = $name, count = $count");
      return false;
    }
    
    //取出若干的Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    //将其放在$result数组里 并 删除掉redis对应的id
    foreach ($array as $id => $score) {
      $result[] = [&#39;id&#39;=>$id, &#39;score&#39;=>$score];
      $this->_redis->zset->delete("Queue:$name", $id);
    }

    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
  }

  以上就是用Redis实现任务队列的整一套思路和代码实现的总结和分享

/**
 * 任务队列
 * 
 */
class RedisQueue {
  private $_redis;

  public function construct($param = null) {
    $this->_redis = RedisFactory::get($param);
  }

  /**
   * 入队一个 Task
   * @param [type] $name     队列名称
   * @param [type] $id      任务id(或者其数组)
   * @param integer $timeout    入队超时时间(秒)
   * @param integer $afterInterval [description]
   * @return [type]         [description]
   */
  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    //合法性检测
    if (empty($name) || empty($id) || $timeout <= 0) return false;

    //加锁
    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
      Logger::get(&#39;queue&#39;)->error("enqueue faild becouse of lock failure: name = $name, id = $id");
      return false;
    }
    
    //入队时以当前时间戳作为 score
    $score = microtime(true) + $afterInterval;
    //入队
    foreach ((array)$id as $item) {
      //先判断下是否已经存在该id了
      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
        $this->_redis->zset->add("Queue:$name", $score, $item);
      }
    }
    
    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  /**
   * 出队一个Task,需要指定$id 和 $score
   * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
   * 
   * @param [type] $name  队列名称 
   * @param [type] $id   任务标识
   * @param [type] $score  任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
   * @param integer $timeout 超时时间(秒)
   * @return [type]      Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
   */
  public function dequeue($name, $id, $score, $timeout = 10) {
    //合法性检测
    if (empty($name) || empty($id) || empty($score)) return false;
    
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
      Logger:get(&#39;queue&#39;)->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
      return false;
    }
    
    //出队
    //先取出redis的score
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    //先判断传进来的score和redis的score是否是一样
    if ($serverScore == $score) {
      //删掉该$id
      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);
      if ($result == false) {
        Logger::get(&#39;queue&#39;)->error("dequeue faild because of redis delete failure: name =$name, id = $id");
      }
    }
    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return $result;
  }

  /**
   * 获取队列顶部若干个Task 并将其出队
   * @param [type] $name  队列名称
   * @param integer $count  数量
   * @param integer $timeout 超时时间
   * @return [type]      返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function pop($name, $count = 1, $timeout = 10) {
    //合法性检测
    if (empty($name) || $count <= 0) return []; 
    
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name")) {
      Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
      return false;
    }
    
    //取出若干的Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    //将其放在$result数组里 并 删除掉redis对应的id
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
      $this->_redis->zset->delete("Queue:$name", $id);
    }

    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
  }

  /**
   * 获取队列顶部的若干个Task
   * @param [type] $name 队列名称
   * @param integer $count 数量
   * @return [type]     返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function top($name, $count = 1) {
    //合法性检测
    if (empty($name) || $count < 1) return [];

    //取错若干个Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
    
    //将Task存放在数组里
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
    }

    //返回数组 
    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;    
  }
}

  到此,这两大块功能基本讲解完毕,对于任务队列,你可以写一个shell脚本,让服务器定时运行某些程序,实现入队出队等操作,这里我就不在将其与实际应用结合起来去实现了,大家理解好这两大功能的实现思路即可,由于代码用的是PHP语言来写的,如果你理解了实现思路,你完全可以使用java或者是.net等等其他语言去实现这两个功能。这两大功能的应用场景十分多,特别是秒杀,另一个就是春运抢火车票,这两个是最鲜明的例子了。当然还有很多地方用到,这里我不再一一列举。

附上分布式锁和任务队列这两个类:

/**
 *在redis上实现分布式锁
 */
class RedisLock {
  private $redisString;
  private $lockedNames = [];

  public function construct($param = NULL) {
    $this->redisString = RedisFactory::get($param)->string;
  }

  /**
   * 加锁
   * @param [type] $name      锁的标识名
   * @param integer $timeout    循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待
   * @param integer $expire     当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放
   * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)
   * @return [type]         [description]
   */
  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
    if ($name == null) return false;

    //取得当前时间
    $now = time();
    //获取锁失败时的等待超时时刻
    $timeoutAt = $now + $timeout;
    //锁的最大生存时刻
    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";
    while (true) {
      //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放
      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {
        //设置key的失效时间
        $this->redisString->expire($redisKey, $expireAt);
        //将锁标志放到lockedNames数组里
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      //以秒为单位,返回给定key的剩余生存时间
      $ttl = $this->redisString->ttl($redisKey);

      //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
      //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
      //这时可以直接设置expire并把锁纳为己用
      if ($ttl < 0) {
        $this->redisString->set($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;
      }

      /*****循环请求锁部分*****/
      //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出
      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      //隔 $waitIntervalUs 后继续 请求
      usleep($waitIntervalUs);

    }

    return false;
  }

  /**
   * 解锁
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function unlock($name) {
    //先判断是否存在此锁
    if ($this->isLocking($name)) {
      //删除锁
      if ($this->redisString->deleteKey("Lock:$name")) {
        //清掉lockedNames里的锁标志
        unset($this->lockedNames[$name]);
        return true;
      }
    }
    return false;
  }

  /**
   * 释放当前所有获得的锁
   * @return [type] [description]
   */
  public function unlockAll() {
    //此标志是用来标志是否释放所有锁成功
    $allSuccess = true;
    foreach ($this->lockedNames as $name => $expireAt) {
      if (false === $this->unlock($name)) {
        $allSuccess = false;  
      }
    }
    return $allSuccess;
  }

  /**
   * 给当前所增加指定生存时间,必须大于0
   * @param [type] $name [description]
   * @return [type]    [description]
   */
  public function expire($name, $expire) {
    //先判断是否存在该锁
    if ($this->isLocking($name)) {
      //所指定的生存时间必须大于0
      $expire = max($expire, 1);
      //增加锁生存时间
      if ($this->redisString->expire("Lock:$name", $expire)) {
        return true;
      }
    }
    return false;
  }

  /**
   * 判断当前是否拥有指定名字的所
   * @param [type] $name [description]
   * @return boolean    [description]
   */
  public function isLocking($name) {
    //先看lonkedName[$name]是否存在该锁标志名
    if (isset($this->lockedNames[$name])) {
      //从redis返回该锁的生存时间
      return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name");
    }

    return false;
  }

}

/**
 * 任务队列
 */
class RedisQueue {
  private $_redis;

  public function construct($param = null) {
    $this->_redis = RedisFactory::get($param);
  }

  /**
   * 入队一个 Task
   * @param [type] $name     队列名称
   * @param [type] $id      任务id(或者其数组)
   * @param integer $timeout    入队超时时间(秒)
   * @param integer $afterInterval [description]
   * @return [type]         [description]
   */
  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    //合法性检测
    if (empty($name) || empty($id) || $timeout <= 0) return false;

    //加锁
    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
      Logger::get(&#39;queue&#39;)->error("enqueue faild becouse of lock failure: name = $name, id = $id");
      return false;
    }
    
    //入队时以当前时间戳作为 score
    $score = microtime(true) + $afterInterval;
    //入队
    foreach ((array)$id as $item) {
      //先判断下是否已经存在该id了
      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
        $this->_redis->zset->add("Queue:$name", $score, $item);
      }
    }
    
    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return true;

  }

  /**
   * 出队一个Task,需要指定$id 和 $score
   * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
   * 
   * @param [type] $name  队列名称 
   * @param [type] $id   任务标识
   * @param [type] $score  任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
   * @param integer $timeout 超时时间(秒)
   * @return [type]      Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
   */
  public function dequeue($name, $id, $score, $timeout = 10) {
    //合法性检测
    if (empty($name) || empty($id) || empty($score)) return false;
    
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
      Logger:get(&#39;queue&#39;)->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
      return false;
    }
    
    //出队
    //先取出redis的score
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    //先判断传进来的score和redis的score是否是一样
    if ($serverScore == $score) {
      //删掉该$id
      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);
      if ($result == false) {
        Logger::get(&#39;queue&#39;)->error("dequeue faild because of redis delete failure: name =$name, id = $id");
      }
    }
    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return $result;
  }

  /**
   * 获取队列顶部若干个Task 并将其出队
   * @param [type] $name  队列名称
   * @param integer $count  数量
   * @param integer $timeout 超时时间
   * @return [type]      返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function pop($name, $count = 1, $timeout = 10) {
    //合法性检测
    if (empty($name) || $count <= 0) return []; 
    
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name")) {
      Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
      return false;
    }
    
    //取出若干的Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    //将其放在$result数组里 并 删除掉redis对应的id
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
      $this->_redis->zset->delete("Queue:$name", $id);
    }

    //解锁
    $this->_redis->lock->unlock("Queue:$name");

    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
  }

  /**
   * 获取队列顶部的若干个Task
   * @param [type] $name 队列名称
   * @param integer $count 数量
   * @return [type]     返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
   */
  public function top($name, $count = 1) {
    //合法性检测
    if (empty($name) || $count < 1) return [];

    //取错若干个Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
    
    //将Task存放在数组里
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
    }

    //返回数组 
    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;    
  }
}

以上是php redis分布式锁和任务队列代码实例详解的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
继续使用PHP:耐力的原因继续使用PHP:耐力的原因Apr 19, 2025 am 12:23 AM

PHP仍然流行的原因是其易用性、灵活性和强大的生态系统。1)易用性和简单语法使其成为初学者的首选。2)与web开发紧密结合,处理HTTP请求和数据库交互出色。3)庞大的生态系统提供了丰富的工具和库。4)活跃的社区和开源性质使其适应新需求和技术趋势。

PHP和Python:探索他们的相似性和差异PHP和Python:探索他们的相似性和差异Apr 19, 2025 am 12:21 AM

PHP和Python都是高层次的编程语言,广泛应用于Web开发、数据处理和自动化任务。1.PHP常用于构建动态网站和内容管理系统,而Python常用于构建Web框架和数据科学。2.PHP使用echo输出内容,Python使用print。3.两者都支持面向对象编程,但语法和关键字不同。4.PHP支持弱类型转换,Python则更严格。5.PHP性能优化包括使用OPcache和异步编程,Python则使用cProfile和异步编程。

PHP和Python:解释了不同的范例PHP和Python:解释了不同的范例Apr 18, 2025 am 12:26 AM

PHP主要是过程式编程,但也支持面向对象编程(OOP);Python支持多种范式,包括OOP、函数式和过程式编程。PHP适合web开发,Python适用于多种应用,如数据分析和机器学习。

PHP和Python:深入了解他们的历史PHP和Python:深入了解他们的历史Apr 18, 2025 am 12:25 AM

PHP起源于1994年,由RasmusLerdorf开发,最初用于跟踪网站访问者,逐渐演变为服务器端脚本语言,广泛应用于网页开发。Python由GuidovanRossum于1980年代末开发,1991年首次发布,强调代码可读性和简洁性,适用于科学计算、数据分析等领域。

在PHP和Python之间进行选择:指南在PHP和Python之间进行选择:指南Apr 18, 2025 am 12:24 AM

PHP适合网页开发和快速原型开发,Python适用于数据科学和机器学习。1.PHP用于动态网页开发,语法简单,适合快速开发。2.Python语法简洁,适用于多领域,库生态系统强大。

PHP和框架:现代化语言PHP和框架:现代化语言Apr 18, 2025 am 12:14 AM

PHP在现代化进程中仍然重要,因为它支持大量网站和应用,并通过框架适应开发需求。1.PHP7提升了性能并引入了新功能。2.现代框架如Laravel、Symfony和CodeIgniter简化开发,提高代码质量。3.性能优化和最佳实践进一步提升应用效率。

PHP的影响:网络开发及以后PHP的影响:网络开发及以后Apr 18, 2025 am 12:10 AM

PHPhassignificantlyimpactedwebdevelopmentandextendsbeyondit.1)ItpowersmajorplatformslikeWordPressandexcelsindatabaseinteractions.2)PHP'sadaptabilityallowsittoscaleforlargeapplicationsusingframeworkslikeLaravel.3)Beyondweb,PHPisusedincommand-linescrip

PHP类型提示如何起作用,包括标量类型,返回类型,联合类型和无效类型?PHP类型提示如何起作用,包括标量类型,返回类型,联合类型和无效类型?Apr 17, 2025 am 12:25 AM

PHP类型提示提升代码质量和可读性。1)标量类型提示:自PHP7.0起,允许在函数参数中指定基本数据类型,如int、float等。2)返回类型提示:确保函数返回值类型的一致性。3)联合类型提示:自PHP8.0起,允许在函数参数或返回值中指定多个类型。4)可空类型提示:允许包含null值,处理可能返回空值的函数。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热工具

mPDF

mPDF

mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),

VSCode Windows 64位 下载

VSCode Windows 64位 下载

微软推出的免费、功能强大的一款IDE编辑器

EditPlus 中文破解版

EditPlus 中文破解版

体积小,语法高亮,不支持代码提示功能

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用