ホームページ >データベース >Redis >Golang 分散アプリケーションで Redis を使用する方法

Golang 分散アプリケーションで Redis を使用する方法

王林
王林転載
2023-05-26 22:07:36879ブラウズ

    本文

    Redis は、分散システムでよく使用される高性能のインメモリ データベースです。分散キャッシュまたは単純な機能に加えて、データベースには特別なアプリケーション シナリオもいくつかあり、この記事では Golang を組み合わせて対応するミドルウェアを作成します。

    分散ロック

    スタンドアロン システムでは、sync.Mutex を使用して重要なリソースを保護できますが、分散システムでもそのようなニーズがあります。複数のホスト 同じリソースを占有するには、対応する「分散ロック」を追加する必要があります。

    Redis では、setnx コマンドを使用してこれを実現できます

    • キーが存在しない場合は、対応する値を設定できます。設定が成功すると、ロックが成功します。 、キーが存在しないため、失敗が返されます。

    • ロックの解除は、del を通じて実行できます。

    主なロジックは次のとおりです:

    type RedisLock struct {
    	client     *redis.Client
    	key        string
    	expiration time.Duration // 过期时间,防止宕机或者异常
    }
    func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock {
    	return &RedisLock{
    		client:     client,
    		key:        key,
    		expiration: expiration,
    	}
    }
    // 加锁将成功会将调用者id保存到redis中
    func (l *RedisLock) Lock(id string) (bool, error) {
    	return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()
    }
    const unLockScript = `
    if (redis.call("get", KEYS[1]) == KEYS[2]) then
    	redis.call("del", KEYS[1])
    	return true
    end
    return false
    `
    // 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁
    func (l *RedisLock) UnLock(id string) error {
    	_, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }

    異常なリクエストによって引き起こされるシステムのダウンタイムやデッドロックを防ぐために、追加のタイムアウト期間を追加する必要があります。最大推定実行時間の 2 倍に設定する必要があります。

    Lua スクリプトはロック解除時のアトミック性を確保するために使用され、呼び出し側は自分自身で追加されたロックのみをロック解除します。タイムアウトによる混乱を避けてください。例: プロセス A は時刻 t1 にロックを取得しましたが、実行が遅いため、ロックは時刻 t2 にタイムアウトしました。プロセス B は t3 にロックを取得しました。プロセス A が実行を終了してプロセスのロックを解除すると、プロセスはキャンセルされます。B のロック。

    テストを実行します

    func main() {
        client := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	lock := NewLock(client, "counter", 30*time.Second)
        counter := 0
    	worker := func(i int) {
    		for {
    			id := fmt.Sprintf("worker%d", i)
    			ok, err := lock.Lock(id)
    			log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
    			if !ok {
    				time.Sleep(100 * time.Millisecond)
    				continue
    			}
    			defer lock.UnLock(id)
    			counter++
    			log.Printf("worker %d, add counter %d", i, counter)
    			break
    		}
    	}
    	wg := sync.WaitGroup{}
    	for i := 1; i <= 5; i++ {
    		wg.Add(1)
    		id := i
    		go func() {
    			defer wg.Done()
    			worker(id)
    		}()
    	}
    	wg.Wait()
    }

    実行結果は、効果が sync.Mutex

    2022/07/22 09 と同様であることを示しています。 : 58:09 ワーカー 5 がロックを取得しようとしました。ok: true、エラー: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:09 ワーカー 5、カウンタ 1 を追加します
    2022/07/22 09: 58 :09 ワーカー 4 がロックを取得しようとしました、ok: false、エラー:
    #2022/07/22 09:58:09 ワーカー 1 がロックを取得しようとしました、ok: false、エラー:
    2022/07/22 09:58:09 ワーカー 2 がロックを取得しようとしました、ok: false、err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:09 ワーカー 3 がロックを取得しようとしました、 ok : false、err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 ワーカー 3 がロックを取得しようとしました、ok: false、err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09: 58 :10 ワーカー 1 がロックを取得しようとしました、ok: false、エラー:
    #2022/07/22 09:58:10 ワーカー 2 がロックを取得しようとしました、ok: false、エラー:
    2022/07/22 09:58:10 ワーカー 4 がロックを取得しようとしました。ok: true、エラー: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 ワーカー 4、カウンタ 2 を追加
    2022/07/22 09:58:10 ワーカー 1 がロックを取得しようとしました。ok: true、err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 ワーカー 1、カウンター 3 を追加
    2022/07/22 09:58:10 ワーカー 3 がロックを取得しようとしました、ok: false、err:
    2022/07/22 09:58:10 ワーカー 2 がロックを取得しようとしました、ok: false 、エラー: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 ワーカー 2 がロックを取得しようとしました、ok: true、エラー: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58: 10 ワーカー 2、カウンタ 4 を追加します
    2022/07/22 09:58:10 ワーカー 3 がロックを取得しようとしました、OK: false、エラー: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10ワーカー 3 はロックを取得しようとしました、ok: true、エラー: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 ワーカー 3、カウンタ 5 を追加

    特別な注意は、分散 Redis クラスターでは、例外が発生した場合 (マスター ノードがダウンした場合)、分散ロックの可用性が低下する可能性があります。これは、etcd や ZooKeeper などの強力な整合性コンポーネントによって実現できます。

    分散フィルタ

    数百万の Web ページをクロールするクローラー サービスを開発したいとします。特定の Web ページがクロールされたかどうかを判断するには、データベースと HashMap を使用するだけでなく、ブルームフィルターを使用してそれを行うことができます。他の方法と比較して、ブルーム フィルターは占有スペースが非常に少なく、挿入時間とクエリ時間が非常に速くなります。

    ブルーム フィルターは、BitSet を使用して、要素がセット内にあるかどうかを判断するために使用されます。

    • データを挿入するとき、値は複数回ハッシュされ、対応する BitSet位置は 1

    • ##クエリを実行する場合、ハッシュも複数回実行され、すべてのビットが 1 であるかどうかが比較されます。そうであれば、それは存在します。

    ブルーム フィルターには一定の誤判定率があり、正確なクエリ シナリオには適していません。また、要素の削除はサポートされていません。通常、URL の重複排除、スパム フィルタリング、キャッシュの破損防止などのシナリオで使用されます。

    Redis では、組み込みの BitSet 実装を使用できるほか、lua スクリプトのアトミック性を使用して、複数のクエリ データの不整合を回避できます。

    const (
    	// 插入数据,调用setbit设置对应位
    	setScript = `
    for _, offset in ipairs(ARGV) do
    	redis.call("setbit", KEYS[1], offset, 1)
    end
    `
    	// 查询数据,如果所有位都为1返回true
    	getScript = `
    for _, offset in ipairs(ARGV) do
    	if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
    		return false
    	end
    end
    return true
    `
    )
    type BloomFilter struct {
    	client *redis.Client
    	key    string // 存在redis中的key
    	bits   uint // BitSet的大小
    	maps   uint // Hash的次数
    }
    func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter {
    	client.Del(context.TODO(), key)
    	if maps == 0 {
    		maps = 14
    	}
    	return &BloomFilter{
    		key:    key,
    		client: client,
    		bits:   bits,
    		maps:   maps,
    	}
    }
    // 进行多次Hash, 得到位置列表
    func (f *BloomFilter) getLocations(data []byte) []uint {
    	locations := make([]uint, f.maps)
    	for i := 0; i < int(f.maps); i++ {
    		val := murmur3.Sum64(append(data, byte(i)))
    		locations[i] = uint(val) % f.bits
    	}
    	return locations
    }
    func (f *BloomFilter) Add(data []byte) error {
    	args := getArgs(f.getLocations(data))
    	_, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }
    func (f *BloomFilter) Exists(data []byte) (bool, error) {
    	args := getArgs(f.getLocations(data))
    	resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result()
    	if err != nil {
    		if err == redis.Nil {
    			return false, nil
    		}
    		return false, err
    	}
    	exists, ok := resp.(int64)
    	if !ok {
    		return false, nil
    	}
    	return exists == 1, nil
    }
    func getArgs(locations []uint) []string {
    	args := make([]string, 0)
    	for _, l := range locations {
    		args = append(args, strconv.FormatUint(uint64(l), 10))
    	}
    	return args
    }

    テストの実行

    func main() {
    	bf := NewBloomFilter(client,"bf-test", 2^16, 14)
    	exists, err := bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	if err := bf.Add([]byte("test1")); err != nil {
    		log.Printf("add err: %v", err)
    	}
    	exists, err = bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	exists, err = bf.Exists([]byte("test2"))
    	log.Printf("exist %t, err %v", exists, err)
    // output
    // 2022/07/22 10:05:58 exist false, err <nil>
    // 2022/07/22 10:05:58 exist true, err <nil>
    // 2022/07/22 10:05:58 exist false, err <nil>
    }

    分散レート リミッタ

    トークンベースは

    golang.org/x/time/rate パッケージ バケットで提供されます電流リミッター。分散環境で電流制限を実装する場合は、Redis Lua スクリプトに基づいて実装できます。

    トークン バケットの主な原理は次のとおりです。

    • トークン バケットの容量がバーストし、トークンが1 秒あたりの qps

    • 最初に、トークンはいっぱいになります。トークンがオーバーフローした場合、トークンは直接破棄されます。トークンをリクエストするとき、バケットに十分なトークンがあれば許可されます。

    • burst==qps の場合は、qps フロー制限に厳密に従ってください。burst>qps の場合は、特定のバースト トラフィックが許可されます

    这里主要参考了官方rate包的实现,将核心逻辑改为Lua实现。

    --- 相关Key
    --- limit rate key值,对应value为当前令牌数
    local limit_key = KEYS[1]
    --- 输入参数
    --[[
    qps: 每秒请求数;
    burst: 令牌桶容量;
    now: 当前Timestamp;
    cost: 请求令牌数;
    max_wait: 最大等待时间
    --]]
    local qps = tonumber(ARGV[1])
    local burst = tonumber(ARGV[2])
    local now = ARGV[3]
    local cost = tonumber(ARGV[4])
    local max_wait = tonumber(ARGV[5])
    --- 获取redis中的令牌数
    local tokens = redis.call("hget", limit_key, "token")
    if not tokens then
    	tokens = burst
    end
    --- 上次修改时间
    local last_time = redis.call("hget", limit_key, "last_time")
    if not last_time then
    	last_time = 0
    end
    --- 最新等待时间
    local last_event = redis.call("hget", limit_key, "last_event")
    if not last_event then
    	last_event = 0
    end
    --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数
    local delta = math.max(0, now-last_time)
    local new_tokens = math.min(burst, delta * qps + tokens)
    new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌
    --- 如果最新令牌数小于0,计算需要等待的时间
    local wait_period = 0
    if new_tokens < 0 and qps > 0 then
    	wait_period = wait_period - new_tokens / qps
    end
    wait_period = math.ceil(wait_period)
    local time_act = now + wait_period --- 满足等待间隔的时间戳
    --- 允许请求有两种情况
    --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求
    --- qps为0时,只要最新令牌数不小于0即可
    local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
    --- 设置对应值
    if ok then
    	redis.call("set", limit_key, new_tokens)
    	redis.call("set", last_time_key, now)
    	redis.call("set", last_event_key, time_act)
    end
    --- 返回列表,{是否允许, 等待时间}
    return {ok, wait_period}

    在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现

    // 调用lua脚本
    func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
    	// ...
    	res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
    	if err != nil && err != redis.Nil {
    		return nil, err
    	}
    	//...
    	return &Reservation{
    		ok:        allow == 1,
    		lim:       lim,
    		tokens:    n,
    		timeToAct: now.Add(time.Duration(wait) * time.Second),
    	}, nil
    }

    运行测试

    func main() {
    	rdb := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
    	if err != nil {
    		log.Fatal(err)
    	}
    	r.Reset()
    	for i := 0; i < 5; i++ {
    		err := r.Wait(context.TODO())
    		log.Printf("worker %d allowed: %v", i, err)
    	}
    }
    // output
    // 2022/07/22 12:50:31 worker 0 allowed: <nil>
    // 2022/07/22 12:50:31 worker 1 allowed: <nil>
    // 2022/07/22 12:50:32 worker 2 allowed: <nil>
    // 2022/07/22 12:50:33 worker 3 allowed: <nil>
    // 2022/07/22 12:50:34 worker 4 allowed: <nil>

    前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。

    其他

    Redis还可用于全局计数、去重以及发布订阅等不同情境。参考Redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。

    以上がGolang 分散アプリケーションで Redis を使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。