Heim  >  Artikel  >  Datenbank  >  Lassen Sie uns ausführlich über verteilte Sperren in Redis sprechen

Lassen Sie uns ausführlich über verteilte Sperren in Redis sprechen

青灯夜游
青灯夜游nach vorne
2023-04-06 18:45:201332Durchsuche

Wir alle wissen, dass verteilte Sperren in einer verteilten Umgebung verwendet werden müssen. Welche Eigenschaften benötigen verteilte Sperren? Wie sperre ich eigenständige Redis? Was sind die Fallstricke der Redis-Cluster-Sperre? Machen Sie sich keine Sorgen, lassen Sie uns Schritt für Schritt den Schleier der verteilten Redis-Sperre lüften.

Lassen Sie uns ausführlich über verteilte Sperren in Redis sprechen

Eigenschaften verteilter Sperren

  • 1. Exklusivität

Nur ein Thread kann die Sperre unter allen Umständen halten.

  • 2. Hohe Verfügbarkeit

Die Redis-Cluster-Umgebung kann es nicht versäumen, Sperren zu erwerben oder freizugeben, weil ein Knoten ausgefallen ist. [Verwandte Empfehlungen: Redis-Video-Tutorial]

  • 3. Anti-Deadlock

muss über einen Timeout-Kontrollmechanismus verfügen oder den Vorgang rückgängig machen.

  • 4. Schnappen Sie sich nichts

Sperren Sie es selbst und geben Sie es selbst frei. Von anderen hinzugefügte Sperren können nicht aufgehoben werden.

  • 5. Wiedereintritt

Derselbe Thread kann mehrmals gesperrt werden.

So implementieren Sie Redis auf einem einzelnen Computer

Im Allgemeinen wird es mit dem Setnx+Lua-Skript implementiert.

Posten Sie den Code direkt

package com.fandf.test.redis;

import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
 * redis 单机锁
 *
 * @author fandongfeng
 * @date 2023/3/29 06:52
 */
@Slf4j
@Service
public class RedisLock {

    @Resource
    RedisTemplate<String, Object> redisTemplate;

    private static final String SELL_LOCK = "kill:";

    /**
     * 模拟秒杀
     *
     * @return 是否成功
     */
    public String kill() {

        String productId = "123";
        String key = SELL_LOCK + productId;
        //锁value,解锁时 用来判断当前锁是否是自己加的
        String value = IdUtil.fastSimpleUUID();
        //加锁 十秒钟过期 防死锁
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);
        if (!flag) {
            return "加锁失败";
        }
        try {
            String productKey = "good123";
            //获取商品库存
            Integer stock = (Integer) redisTemplate.opsForValue().get(productKey);
            if (stock == null) {
                //模拟录入数据, 实际应该加载时从数据库读取
                redisTemplate.opsForValue().set(productKey, 100);
                stock = 100;
            }
            if (stock <= 0) {
                return "卖完了,下次早点来吧";
            }
            //扣减库存, 模拟随机卖出数量
            int randomInt = RandomUtil.randomInt(1, 10);
            redisTemplate.opsForValue().decrement(productKey, randomInt);
            // 修改db,可以丢到队列里慢慢处理
            return "成功卖出" + randomInt + "个,库存剩余" + redisTemplate.opsForValue().get(productKey) + "个";
        } finally {

//            //这种方法会存在删除别人加的锁的可能
//            redisTemplate.delete(key);

//            if(value.equals(redisTemplate.opsForValue().get(key))){
//                //因为if条件的判断和 delete不是原子性的,
//                //if条件判断成功后,恰好锁到期自己解锁
//                //此时别的线程如果持有锁了,就会把别人的锁删除掉
//                redisTemplate.delete(key);
//            }

            //使用lua脚本保证判断和删除的原子性
            String luaScript =
                    "if (redis.call(&#39;get&#39;,KEYS[1]) == ARGV[1]) then " +
                            "return redis.call(&#39;del&#39;,KEYS[1]) " +
                            "else " +
                            "return 0 " +
                            "end";
            redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(key), value);
        }
    }


}

Führen Sie Unit-Tests durch, simulieren Sie hundert Threads, um gleichzeitig einen Flash-Kill durchzuführen

package com.fandf.test.redis;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;

/**
 * @Description:
 * @author: fandongfeng
 * @date: 2023-3-24 16:45
 */
@SpringBootTest
class SignServiceTest {

  
    @Resource
    RedisLock redisLock;


    @RepeatedTest(100)
    @Execution(CONCURRENT)
    public void redisLock() {
        String result = redisLock.kill();
        if("加锁失败".equals(result)) {

        }else {
            System.out.println(result);
        }
    }
}

Nur drei Threads haben die Sperre ergriffen

成功卖出5个,库存剩余95个
成功卖出8个,库存剩余87个
成功卖出7个,库存剩余80个

Was stimmt mit der Redis-Sperre nicht?

Im Allgemeinen gibt es zwei:

  • 1. Wiedereintritt nicht möglich.
  • 2. Um einen Deadlock zu verhindern, fügen wir beim Sperren eine Ablaufzeit hinzu. In den meisten Fällen basiert diese Zeit auf der Erfahrungsauswertung des bestehenden Geschäfts, aber im Falle einer Programmblockierung oder Ausnahme wird die Ausführung lange dauern Nach längerer Zeit wird die Sperre automatisch aufgehoben, wenn sie abläuft. Wenn zu diesem Zeitpunkt andere Threads die Sperre erhalten und Logik ausführen, können Probleme auftreten.

Gibt es also eine Möglichkeit, diese beiden Probleme zu lösen? Ja, lass uns über Redisson reden

Redisson implementiert verteilte Sperren

Was ist Redisson?

Redisson ist ein Java-In-Memory-Datengrid (In-Memory Data Grid), das auf Basis von Redis implementiert wird. Es stellt nicht nur eine Reihe verteilter gemeinsamer Java-Objekte bereit, sondern auch viele verteilte Dienste. Dazu gehört (BitSetSetMultimapSortedSetMapListQueueBlockingQueueDequeBlockingDequeSemaphoreLockAtomicLongCountDownLatchPublish / SubscribeBloom filterRemote serviceSpring cacheExecutor serviceLive Object serviceScheduler service) Redisson bietet die einfachste und bequemste Möglichkeit, Redis zu verwenden. Der Zweck von Redisson besteht darin, die Trennung der Benutzer von Redis zu fördern, damit sich Benutzer mehr auf die Verarbeitung der Geschäftslogik konzentrieren können.

Springboot integriert Redisson

Die Integration ist sehr einfach, nur zwei Schritte.

  1. pom führt Abhängigkeiten ein reee
  2. Freunde, die Redisson möglicherweise nicht kennen, können nicht anders, als Fragen zu stellen.
Was? Müssen Sie beim Sperren keine Ablaufzeit hinzufügen? Wird dies zu einem Stillstand führen? Ist es nicht notwendig zu beurteilen, ob Sie es selbst halten, um es zu entsperren?
    Haha, keine Sorge, wir werden Redisson Schritt für Schritt enthüllen.
  1. Verfolgung des Quellcodes von Redisson lock()

Folgen wir Schritt für Schritt der lock()-Methode, um uns den Quellcode anzusehen (die lokale Redisson-Version ist 3.20.0)

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>

Sehen Sie sich die Sperre an(-1, null, false ); Methode

spring:
  application:
    name: test
  redis:
    host: 127.0.0.1
    port: 6379

us Schauen wir uns an, wie es gesperrt wird, nämlich die tryAcquire-Methode

package com.fandf.test.redis;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author fandongfeng
 */
@Component
@Slf4j
public class RedissonTest {

    @Resource
    RedissonClient redissonClient;

    public void test() {
        RLock rLock = redissonClient.getLock("anyKey");
        //rLock.lock(10, TimeUnit.SECONDS);
        rLock.lock();
        try {
            // do something
        } catch (Exception e) {
            log.error("业务异常", e);
        } finally {
            rLock.unlock();
        }

    }
    
}
e

Der obige Code enthält zuerst die Logik des Sperrens und der Sperreerneuerung

//RedissonLock.class

@Override
public void lock() {
    try {
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}

It Hier wird sehr deutlich, dass Redisson das Lua-Skript verwendet, um die Atomizität der Befehle sicherzustellen.

redis.call('hexists', KEYS[1], ARGV[2]) Überprüfen Sie, ob der Schlüsselwert vorhanden ist.

Mit dem Redis Hexists-Befehl wird überprüft, ob das angegebene Feld der Hash-Tabelle vorhanden ist.

Wenn die Hash-Tabelle das angegebene Feld enthält, geben Sie 1 zurück. Wenn die Hash-Tabelle das angegebene Feld nicht enthält oder der Schlüssel nicht existiert, wird 0 zurückgegeben.

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //获取当前线程id
        long threadId = Thread.currentThread().getId();
        //加锁代码块, 返回锁的失效时间
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

Wenn der Schlüssel nicht existiert oder bereits ein bestimmtes Feld enthält (d. h. er wurde gesperrt, um Wiedereintritt zu erreichen), addieren Sie direkt 1 zum Wert des Feldes.

Der Wert dieses Felds ARGV[2], erhalten durch die Methode getLockName(threadId), schauen wir uns den Wert dieses Feldes an Code ScheduleExpirationRenewal( threadId);

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //真假加锁方法 tryAcquireAsync
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

Werfen wir einen Blick auf die Methode renewExpiration()

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //新建一个线程执行
    Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //设置锁过期时间为30秒
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can&#39;t update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                //检查锁是还否存在
                if (res) {
                    // reschedule itself 10后调用自己
                    renewExpiration();
                } else {
                    //关闭续约
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    //注意上行代码internalLockLeaseTime / 3,
    //internalLockLeaseTime默认30s,那么也就是10s检查一次
    ee.setTimeout(task);
}

//设置锁过期时间为internalLockLeaseTime  也就是30s  lua脚本保证原子性
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。

我们写个测试类看看

package com.fandf.test.redis;

import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @Description:
 * @author: fandongfeng
 * @date: 2023-3-2416:45
 */
@SpringBootTest
class RedissonTest {

    @Resource
    private RedissonClient redisson;


    @Test
    public void watchDog() throws InterruptedException {
        RLock lock = redisson.getLock("123");
        lock.lock();
        Thread.sleep(1000000);
    }
}

查看锁的过期时间,及是否续约

127.0.0.1:6379> keys *
1) "123"
127.0.0.1:6379> ttl 123
(integer) 30
127.0.0.1:6379> ttl 123
(integer) 26
127.0.0.1:6379> ttl 123
(integer) 24
127.0.0.1:6379> ttl 123
(integer) 22
127.0.0.1:6379> ttl 123
(integer) 21
127.0.0.1:6379> ttl 123
(integer) 20
127.0.0.1:6379> ttl 123
(integer) 30
127.0.0.1:6379> ttl 123
(integer) 28
127.0.0.1:6379>

我们再改改代码,看看是否可重入和字段名称是否和我们预期一致

package com.fandf.test.redis;

import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
 * @Description:
 * @author: fandongfeng
 * @date: 2023-3-24 16:45
 */
@SpringBootTest
class RedissonTest {

    @Resource
    private RedissonClient redisson;


    @Test
    public void watchDog() throws InterruptedException {
        RLock lock = redisson.getLock("123");
        lock.lock();
        lock.lock();
        lock.lock();
        //加了三次锁,此时重入次数为3
        Thread.sleep(3000);
        //解锁一次,此时重入次数变为3
        lock.unlock();
        Thread.sleep(1000000);
    }
}
127.0.0.1:6379> keys *
1) "123"
127.0.0.1:6379>
127.0.0.1:6379> ttl 123
(integer) 24
127.0.0.1:6379> hgetall 123
1) "df7f4c71-b57b-455f-acee-936ad8475e01:12"
2) "3"
127.0.0.1:6379>
127.0.0.1:6379> hgetall 123
1) "df7f4c71-b57b-455f-acee-936ad8475e01:12"
2) "2"
127.0.0.1:6379>

我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。

Redlock算法

redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?

假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤

  • 1.客户端记录当前系统时间,以毫秒为单位
  • 2.依次尝试从5个redis实例中,使用相同key获取锁
    当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。
  • 3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
    只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功
  • 4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间
  • 5.如果获取锁失败,所有的redis实例都会进行解锁
    防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题

这里有下面几个点需要注意:

  • 1.我们都知道单机的redis是cp的,但是集群情况下redis是ap的,所以运行Redisson的节点必须是主节点,不能有从节点,防止主节点加锁成功未同步从节点就宕机,而客户端却收到加锁成功,导致数据不一致问题。
  • 2.为了提高redis节点宕机的容错率,可以使用公式2N(n指宕机数量)+1,假设宕机一台,Redisson还要继续运行,那么至少要部署2*1+1=3台主节点。

更多编程相关知识,请访问:编程视频!!

Das obige ist der detaillierte Inhalt vonLassen Sie uns ausführlich über verteilte Sperren in Redis sprechen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:juejin.cn. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen