There is a setting in Redis. If the
command does not exist, we can use this command to implement the mutex lock function. The standard implementation is recommended in the Redis official document. The method is SET resource_name my_random_value NX PX 30000
this series of commands, where:
##resource_name represents the resource to be locked
NX means if it does not exist, set it
PX 30000 means the expiration time is 30000 milliseconds, which is 30 seconds
my_random_valueThis value must be unique among all clients, and the value cannot be the same for all lock competitors with the same key.
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 endGive an example without Lua script: Client A obtains the resource lock, but then It was blocked by another operation. When client A wanted to release the lock after running other operations, the original lock had already timed out and was automatically released by Redis. During this period, the resource lock was acquired again by client B. Because judgment and deletion are two operations, it is possible that A will automatically release the lock upon expiration as soon as it judges it, and then B will acquire the lock, and then A will call Del, causing B's lock to be released. . TryLock and Unlock implementation
TryLock actually uses
SET resource_name my_random_value NX PX 30000 to lock, here we use
UUID As a random value, and the random value is returned when the lock is successful, this random value will be used when
Unlock;
UnlockThe unlocking logic is to execute what was mentioned earlier The
lua script.
func (l *Lock) TryLock(ctx context.Context) error { success, err := l.client.SetNX(ctx, l.resource, l.randomValue, ttl).Result() if err != nil { return err } // 加锁失败 if !success { return ErrLockFailed } // 加锁成功 l.randomValue = randomValue return nil } func (l *Lock) Unlock(ctx context.Context) error { return l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err() }Lock implementation
Lock is a blocking acquisition lock, so when the lock fails, you need to retry. Of course, other abnormal situations may occur (such as network problems, request timeouts, etc.), and
error will be returned directly in these situations.
func (l *Lock) Lock(ctx context.Context) error { // 尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } // 加锁失败,不断尝试 ticker := time.NewTicker(l.tryLockInterval) defer ticker.Stop() for { select { case <-ctx.Done(): // 超时 return ErrTimeout case <-ticker.C: // 重新尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } } } }Implementing the watchdog mechanismThe mutex lock mentioned in our previous example has a small problem, that is, if it is held If client A with a lock is blocked, then A's lock may be automatically released after timeout, causing client B to acquire the lock in advance. In order to reduce the occurrence of this situation, we can continuously extend the expiration time of the lock while A holds the lock, and reduce the situation where client B acquires the lock in advance. This is the watchdog mechanism. Of course, there is no way to completely avoid the above situation, because if client A happens to close the connection with Redis after acquiring the lock, there is no way to extend the timeout. Watchdog implementationStart a thread when the lock is successful and continuously extend the lock expiration time; close the watchdog thread when Unlock is executed. The watchdog process is as follows:
func (l *Lock) startWatchDog() { ticker := time.NewTicker(l.ttl / 3) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2) ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result() cancel() // 异常或锁已经不存在则不再续期 if err != nil || !ok { return } case <-l.watchDog: // 已经解锁 return } } }TryLock: Start watchdog
func (l *Lock) TryLock(ctx context.Context) error { success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result() if err != nil { return err } // 加锁失败 if !success { return ErrLockFailed } // 加锁成功,启动看门狗 go l.startWatchDog() return nil }Unlock: Turn off the watchdog
func (l *Lock) Unlock(ctx context.Context) error { err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err() // 关闭看门狗 close(l.watchDog) return err }Red lockSince the above implementation is based on a single Redis instance, if this only instance hangs, all requests will fail because the lock cannot be obtained. In order To improve fault tolerance, we can use multiple Redis instances distributed on different machines, and as long as we get the locks of most of the nodes, we can successfully lock. This is the red lock algorithm. It is actually based on the single instance algorithm above, except that we need to acquire locks on multiple Redis instances at the same time.
在加锁逻辑里,我们主要是对每个Redis实例执行SET resource_name my_random_value NX PX 30000
获取锁,然后把成功获取锁的客户端放到一个channel
里(这里因为是多线程并发获取锁,使用slice可能有并发问题),同时使用sync.WaitGroup
等待所有获取锁操作结束。
然后判断成功获取到的锁的数量是否大于一半,如果没有得到一半以上的锁,说明加锁失败,释放已经获得的锁。
如果加锁成功,则启动看门狗延长锁的过期时间。
func (l *RedLock) TryLock(ctx context.Context) error { randomValue := gofakeit.UUID() var wg sync.WaitGroup wg.Add(len(l.clients)) // 成功获得锁的Redis实例的客户端 successClients := make(chan *redis.Client, len(l.clients)) for _, client := range l.clients { go func(client *redis.Client) { defer wg.Done() success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result() if err != nil { return } // 加锁失败 if !success { return } // 加锁成功,启动看门狗 go l.startWatchDog() successClients <- client }(client) } // 等待所有获取锁操作完成 wg.Wait() close(successClients) // 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败 if len(successClients) < len(l.clients)/2+1 { // 就算加锁失败,也要把已经获得的锁给释放掉 for client := range successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl) l.script.Run(ctx, client, []string{l.resource}, randomValue) cancel() }(client) } return ErrLockFailed } // 加锁成功,启动看门狗 l.randomValue = randomValue l.successClients = nil for successClient := range successClients { l.successClients = append(l.successClients, successClient) } return nil }
我们需要延长所有成功获取到的锁的过期时间。
func (l *RedLock) startWatchDog() { l.watchDog = make(chan struct{}) ticker := time.NewTicker(resetTTLInterval) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 for _, client := range l.successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval) client.Expire(ctx, l.resource, ttl) cancel() }(client) } case <-l.watchDog: // 已经解锁 return } } }
我们需要解锁所有成功获取到的锁。
func (l *RedLock) Unlock(ctx context.Context) error { for _, client := range l.successClients { go func(client *redis.Client) { l.script.Run(ctx, client, []string{l.resource}, l.randomValue) }(client) } // 关闭看门狗 close(l.watchDog) return nil }
The above is the detailed content of How to use Go and Redis to implement distributed mutex locks and red locks. For more information, please follow other related articles on the PHP Chinese website!