首頁 >資料庫 >Redis >Go結合Redis怎麼實現分散式鎖

Go結合Redis怎麼實現分散式鎖

PHPz
PHPz轉載
2023-05-27 21:55:241264瀏覽

    單Redis實例場景

    如果熟悉Redis的命令,可能會馬上想到使用Redis的set if not exists操作來實現,並且現在標準的實作方式是SET resource_name my_random_value NX PX 30000這串指令,其中:

    • resource_name表示要鎖定的資源

    • #NX表示如果不存在則設定

    • PX 30000表示過期時間為30000毫秒,也就是30秒

    • my_random_value這個值在所有的客戶端必須是唯一的,所有同一key的獲取者(競爭者)這個值都不能一樣。

    value的值必須是隨機數主要是為了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在並且儲存的值和我指定的值一樣才能告訴我刪除成功。可以透過以下Lua腳本實現:

    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end

    舉例:客戶端A取得資源鎖,但是緊接著被一個其他操作阻塞了,當客戶端A運行完畢其他操作後要釋放鎖時,原來的鎖早已超時並且被Redis自動釋放,並且在這段期間資源鎖定又被客戶端B再次取得到。

    使用Lua腳本是因為判斷和刪除是兩個操作,所以有可能A剛判斷完鎖就過期自動釋放了,然後B就獲取到了鎖,然後A又調用了Del,導致把B的鎖給釋放了。

    加上解鎖範例

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var client *redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加锁
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       ok, err := client.SetNX(ctx, resourceName, myRandomValue, time.Second*30).Result()
       if err != nil {
          return err
       }
       if !ok {
          return errors.New("系统繁忙,请重试")
       }
       // 解锁
       defer func() {
          script := redis.NewScript(unlockScript)
          script.Run(ctx, client, []string{resourceName}, myRandomValue)
       }()
    
       // 业务处理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       client = redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
       })
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
    }

    我們先看lottery()函數,這裡模擬一個抽獎操作,在進入函數時,先使用SET resource_name my_random_value NX PX 30000加鎖,這裡使用UUID作為隨機值,如果操作失敗,直接返回,讓使用者重試,如果成功在defer裡面執行解鎖邏輯,解鎖邏輯就是執行前面說到得lua腳本,然後再進行業務處理。

    我們在main()函數裡面執行了兩個goroutine並發呼叫lottery()函數,其中有一個操作會因為拿不到鎖而直接失敗。

    小結

    • 產生隨機值

    • 使用SET resource_name my_random_value NX PX 30000加鎖定

    • #如果加鎖失敗,直接回傳

    • defer加入解鎖邏輯,保證在函數退出的時候會執行

    • 執行業務邏輯

    多Redis實例場景

    在單一實例情況下,如果這個實例掛了,那麼所有請求都會因為拿不到鎖而失敗,所以我們需要多個分佈在不同機器上的Redis實例,並且拿到其中大多數節點的鎖才能加鎖成功,也就是RedLock演算法。我們需要同時對多個Redis實例取得鎖,但它實際上也是基於單一實例演算法的。

    加上解鎖範例

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var clients []*redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加锁
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       var wg sync.WaitGroup
       wg.Add(len(clients))
       // 这里主要是确保不要加锁太久,这样会导致业务处理的时间变少
       lockCtx, _ := context.WithTimeout(ctx, time.Millisecond*5)
       // 成功获得锁的Redis实例的客户端
       successClients := make(chan *redis.Client, len(clients))
       for _, client := range clients {
          go func(client *redis.Client) {
             defer wg.Done()
             ok, err := client.SetNX(lockCtx, resourceName, myRandomValue, time.Second*30).Result()
             if err != nil {
                return
             }
             if !ok {
                return
             }
             successClients <- client
          }(client)
       }
       wg.Wait() // 等待所有获取锁操作完成
       close(successClients)
       // 解锁,不管加锁是否成功,最后都要把已经获得的锁给释放掉
       defer func() {
          script := redis.NewScript(unlockScript)
          for client := range successClients {
             go func(client *redis.Client) {
                script.Run(ctx, client, []string{resourceName}, myRandomValue)
             }(client)
          }
       }()
       // 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败
       if len(successClients) < len(clients)/2+1 {
          return errors.New("系统繁忙,请重试")
       }
    
       // 业务处理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       clients = append(clients, redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   0,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   1,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   2,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   3,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   4,
       }))
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
       time.Sleep(time.Second) 
    }

    在上面的程式碼中,我們使用Redis的多資料庫模擬多個Redis master實例,一般我們會選擇5個Redis實例,真實環境中這些實例應該是分佈在不同機器上的,避免同時失效。
    在加鎖邏輯裡,我們主要是對每個Redis實例執行SET resource_name my_random_value NX PX 30000取得鎖,然後把成功取得鎖定的客戶端放到一個channel裡(這裡使用slice可能有並發問題),同時使用sync.WaitGroup等待所以取得鎖定操作結束。
    接著加入defer釋放鎖定邏輯,釋放鎖定邏輯很簡單,只是把成功拿到的鎖定掉下來即可。
    最後判斷成功取得到的鎖的數量是否大於一半,如果沒有得到一半以上的鎖,表示加鎖失敗。
    如果加鎖成功接下來就是進行業務處理。

    小結

    • 產生隨機值

    • 並發給每個Redis實例使用SET resource_name my_random_value NX PX 30000 加上鎖定

    • 等待所有取得鎖定操作完成

    • #defer新增解鎖邏輯,保證在函數退出的時候會執行,這裡先defer再判斷是因為有可能取得到一部分Redis實例的鎖,但是因為沒有超過一半,還是會判斷為加鎖失敗

    • 判斷是否拿到一半以上Redis實例的鎖,如果沒有說明加鎖失敗,直接回傳

    • 執行業務邏輯

    以上是Go結合Redis怎麼實現分散式鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除