>  기사  >  데이터 베이스  >  Redis와 결합된 Go에서 분산 잠금을 구현하는 방법

Redis와 결합된 Go에서 분산 잠금을 구현하는 방법

PHPz
PHPz앞으로
2023-05-27 21:55:241240검색

    Single Redis 인스턴스 시나리오

    Redis 명령어에 익숙하신 분이라면 바로 Redis의 set if not presents 연산을 사용하여 구현하는 것을 생각해 보실 수 있으며, 현재 표준 구현 방식은 SET Resource_name my_random_value NX PX입니다. 30000개 명령 시리즈, 여기서

    • resource_name은 잠길 리소스를 의미합니다.

    • NX는 리소스가 없으면 설정한다는 의미입니다.

    • PX 30000은 만료 시간이 30000밀리초, 즉 30초임을 의미합니다.

    • my_random_value 이 값은 모든 고객이 사용합니다. 끝은 고유해야 하며 동일한 키를 보유한 모든 취득자(경쟁자)가 동일한 값을 가질 수 없습니다.

    잠금을 더 안전하게 해제하려면 주로 값의 값을 임의의 숫자로 설정해야 합니다. 잠금을 해제할 때 스크립트를 사용하여 Redis에게 키가 존재하고 저장된 값이 내가 지정한 값과 동일한 경우에만 알립니다. 삭제가 성공했다는 것을 알 수 있나요? 이는 다음 Lua 스크립트를 통해 달성할 수 있습니다.

    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end

    예: 클라이언트 A가 리소스 잠금을 획득했지만 다른 작업에 의해 즉시 차단되었습니다. 클라이언트 A가 다른 작업을 실행한 후 잠금을 해제하려고 하면 원래 잠금 시간이 이미 초과되었습니다. 그리고 Redis에 의해 자동으로 해제되었으며, 이 기간 동안 클라이언트 B가 리소스 잠금을 다시 획득했습니다.

    Lua 스크립트를 사용하는 이유는 판단과 삭제가 두 가지 작업이기 때문에 A가 판단하자마자 만료된 후 자동으로 잠금을 해제하고 B가 잠금을 획득한 후 A가 Del을 호출하는 경우도 있을 수 있습니다. , B의 잠금이 해제됩니다.

    잠금 해제의 예

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var client *redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加锁
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       ok, err := client.SetNX(ctx, resourceName, myRandomValue, time.Second*30).Result()
       if err != nil {
          return err
       }
       if !ok {
          return errors.New("系统繁忙,请重试")
       }
       // 解锁
       defer func() {
          script := redis.NewScript(unlockScript)
          script.Run(ctx, client, []string{resourceName}, myRandomValue)
       }()
    
       // 业务处理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       client = redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
       })
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
    }

    먼저 복권 연산을 시뮬레이션하는 lottery() 함수를 살펴보겠습니다. 함수에 들어갈 때 먼저 SET Resource_name my_random_value NX PX 30000을 사용하여 잠급니다. 여기서는 UUID가 임의의 값으로 사용됩니다. 작업이 실패하면 직접 Return하고 사용자가 다시 시도하도록 합니다. defer에서 잠금 해제 로직이 성공적으로 실행되면 잠금 해제 로직은 위에서 언급한 Lua 스크립트를 실행한 후 비즈니스 처리를 수행하는 것입니다.

    lottery() 함수를 동시에 호출하기 위해 main() 함수에서 두 개의 고루틴을 실행했습니다. 잠금을 얻을 수 없기 때문에 작업 중 하나가 직접 실패합니다.

    요약

    • 임의의 값 생성

    • SET Resource_name my_random_value NX PX 30000을 사용하여 잠그세요

    • 잠금에 실패하면 직접 반환하세요

    • 잠금 해제 로직을 추가하여 잠금 해제를 확인하세요. 잠금이 해제되다 함수가 종료되면 실행

    • 비즈니스 로직 실행

    다중 Redis 인스턴스 시나리오

    단일 인스턴스의 경우 이 인스턴스가 정지되면 잠금을 얻을 수 없기 때문에 모든 요청이 실패하므로 여러 개가 필요합니다. 서로 다른 곳에 분산된 Redis 인스턴스 시스템의 Redis 인스턴스와 대부분의 노드에 대한 잠금이 성공적으로 잠길 수 있는 것이 RedLock 알고리즘입니다. 동시에 여러 Redis 인스턴스에 대한 잠금을 획득해야 하지만 실제로는 단일 인스턴스 알고리즘을 기반으로 합니다.

    잠금 해제 예제 추가

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var clients []*redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加锁
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       var wg sync.WaitGroup
       wg.Add(len(clients))
       // 这里主要是确保不要加锁太久,这样会导致业务处理的时间变少
       lockCtx, _ := context.WithTimeout(ctx, time.Millisecond*5)
       // 成功获得锁的Redis实例的客户端
       successClients := make(chan *redis.Client, len(clients))
       for _, client := range clients {
          go func(client *redis.Client) {
             defer wg.Done()
             ok, err := client.SetNX(lockCtx, resourceName, myRandomValue, time.Second*30).Result()
             if err != nil {
                return
             }
             if !ok {
                return
             }
             successClients <- client
          }(client)
       }
       wg.Wait() // 等待所有获取锁操作完成
       close(successClients)
       // 解锁,不管加锁是否成功,最后都要把已经获得的锁给释放掉
       defer func() {
          script := redis.NewScript(unlockScript)
          for client := range successClients {
             go func(client *redis.Client) {
                script.Run(ctx, client, []string{resourceName}, myRandomValue)
             }(client)
          }
       }()
       // 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败
       if len(successClients) < len(clients)/2+1 {
          return errors.New("系统繁忙,请重试")
       }
    
       // 业务处理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       clients = append(clients, redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   0,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   1,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   2,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   3,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   4,
       }))
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
       time.Sleep(time.Second) 
    }

    위 코드에서는 Redis의 다중 데이터베이스를 사용하여 여러 Redis 마스터 인스턴스를 시뮬레이션합니다. 일반적으로 실제 환경에서는 5개의 Redis 인스턴스를 선택합니다. 이러한 인스턴스는 서로 다른 시스템에 배포되어야 합니다. 동시 실패.
    잠금 논리에서는 주로 각 Redis 인스턴스에서 SET Resource_name my_random_value NX PX 30000을 실행하여 잠금을 획득한 다음 성공적으로 잠금을 획득한 클라이언트를 채널에 넣습니다(여기서 슬라이스를 사용하면 동시성 문제가 발생할 수 있음). sync.WaitGroup을 사용하여 잠금 획득 작업이 끝날 때까지 기다립니다.
    그런 다음 지연을 추가하여 잠금 로직을 해제하세요. 잠금 해제 로직은 매우 간단합니다. 성공적으로 획득한 잠금을 해제하면 됩니다.
    마지막으로 획득한 잠금 개수가 절반 이상인지 판단합니다. 절반 이상 획득하지 못한 경우 잠금에 실패한 것입니다.
    잠금에 성공하면 다음 단계는 비즈니스 처리를 수행하는 것입니다.

    요약

    • 임의의 값을 생성하고

    • 각 Redis 인스턴스에 사용하도록 보냅니다.SET resource_name my_random_value NX PX 30000Lock

    • 모든 잠금 획득 작업이 완료될 때까지 기다립니다.

    • defer는 잠금 해제 논리를 추가하여 다음을 보장합니다. 함수가 Execution을 종료하면 잠금이 해제됩니다. 여기서는 Redis 인스턴스의 일부 잠금을 얻을 수 있으므로 먼저 연기한 후 판단하지만 절반을 넘지 않기 때문에 여전히 잠금 실패로 판단됩니다

    • Redis 인스턴스의 절반 이상에 대한 잠금을 얻었는지 확인하고 설명이 없으면 잠금에 실패하면 바로

    • 로 돌아가 비즈니스 로직을 실행

    위 내용은 Redis와 결합된 Go에서 분산 잠금을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제