首页  >  文章  >  数据库  >  Go结合Redis怎么实现分布式锁

Go结合Redis怎么实现分布式锁

PHPz
PHPz转载
2023-05-27 21:55:241240浏览

    单Redis实例场景

    如果熟悉Redis的命令,可能会马上想到使用Redis的set if not exists操作来实现,并且现在标准的实现方式是SET resource_name my_random_value NX PX 30000这串命令,其中:

    • resource_name表示要锁定的资源

    • NX表示如果不存在则设置

    • PX 30000表示过期时间为30000毫秒,也就是30秒

    • my_random_value这个值在所有的客户端必须是唯一的,所有同一key的获取者(竞争者)这个值都不能一样。

    value的值必须是随机数主要是为了更安全的释放锁,释放锁的时候使用脚本告诉Redis:只有key存在并且存储的值和我指定的值一样才能告诉我删除成功。可以通过以下Lua脚本实现:

    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end

    举个例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。

    使用Lua脚本是因为判断和删除是两个操作,所以有可能A刚判断完锁就过期自动释放了,然后B就获取到了锁,然后A又调用了Del,导致把B的锁给释放了。

    加解锁示例

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var client *redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加锁
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       ok, err := client.SetNX(ctx, resourceName, myRandomValue, time.Second*30).Result()
       if err != nil {
          return err
       }
       if !ok {
          return errors.New("系统繁忙,请重试")
       }
       // 解锁
       defer func() {
          script := redis.NewScript(unlockScript)
          script.Run(ctx, client, []string{resourceName}, myRandomValue)
       }()
    
       // 业务处理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       client = redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
       })
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
    }

    我们先看lottery()函数,这里模拟一个抽奖操作,在进入函数时,先使用SET resource_name my_random_value NX PX 30000加锁,这里使用UUID作为随机值,如果操作失败,直接返回,让用户重试,如果成功在defer里面执行解锁逻辑,解锁逻辑就是执行前面说到得lua脚本,然后再进行业务处理。

    我们在main()函数里面执行了两个goroutine并发调用lottery()函数,其中有一个操作会因为拿不到锁而直接失败。

    小结

    • 生成随机值

    • 使用SET resource_name my_random_value NX PX 30000加锁

    • 如果加锁失败,直接返回

    • defer添加解锁逻辑,保证在函数退出的时候会执行

    • 执行业务逻辑

    多Redis实例场景

    在单实例情况下,如果这个实例挂了,那么所有请求都会因为拿不到锁而失败,所以我们需要多个分布在不同机器上的Redis实例,并且拿到其中大多数节点的锁才能加锁成功,这也就是RedLock算法。我们需要同时对多个Redis实例获取锁,但它实际上也是基于单实例算法的。

    加解锁示例

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var clients []*redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加锁
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       var wg sync.WaitGroup
       wg.Add(len(clients))
       // 这里主要是确保不要加锁太久,这样会导致业务处理的时间变少
       lockCtx, _ := context.WithTimeout(ctx, time.Millisecond*5)
       // 成功获得锁的Redis实例的客户端
       successClients := make(chan *redis.Client, len(clients))
       for _, client := range clients {
          go func(client *redis.Client) {
             defer wg.Done()
             ok, err := client.SetNX(lockCtx, resourceName, myRandomValue, time.Second*30).Result()
             if err != nil {
                return
             }
             if !ok {
                return
             }
             successClients <- client
          }(client)
       }
       wg.Wait() // 等待所有获取锁操作完成
       close(successClients)
       // 解锁,不管加锁是否成功,最后都要把已经获得的锁给释放掉
       defer func() {
          script := redis.NewScript(unlockScript)
          for client := range successClients {
             go func(client *redis.Client) {
                script.Run(ctx, client, []string{resourceName}, myRandomValue)
             }(client)
          }
       }()
       // 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败
       if len(successClients) < len(clients)/2+1 {
          return errors.New("系统繁忙,请重试")
       }
    
       // 业务处理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       clients = append(clients, redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   0,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   1,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   2,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   3,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   4,
       }))
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
       time.Sleep(time.Second) 
    }

    在上面的代码中,我们使用Redis的多数据库模拟多个Redis master实例,一般我们会选择5个Redis实例,真实环境中这些实例应该是分布在不同机器上的,避免同时失效。
    在加锁逻辑里,我们主要是对每个Redis实例执行SET resource_name my_random_value NX PX 30000获取锁,然后把成功获取锁的客户端放到一个channel里(这里使用slice可能有并发问题),同时使用sync.WaitGroup等待所以获取锁操作结束。
    然后添加defer释放锁逻辑,释放锁逻辑很简单,只是把成功拿到的锁给释放掉即可。
    最后判断成功获取到的锁的数量是否大于一半,如果没有得到一半以上的锁,说明加锁失败。
    如果加锁成功接下来就是进行业务处理。

    小结

    • 生成随机值

    • 并发给每个Redis实例使用SET resource_name my_random_value NX PX 30000加锁

    • 等待所有获取锁操作完成

    • defer添加解锁逻辑,保证在函数退出的时候会执行,这里先defer再判断是因为有可能获取到一部分Redis实例的锁,但是因为没有超过一半,还是会判断为加锁失败

    • 判断是否拿到一半以上Redis实例的锁,如果没有说明加锁失败,直接返回

    • 执行业务逻辑

    以上是Go结合Redis怎么实现分布式锁的详细内容。更多信息请关注PHP中文网其他相关文章!

    声明:
    本文转载于:yisu.com。如有侵权,请联系admin@php.cn删除