首頁  >  文章  >  資料庫  >  Golang分散式應用程式之Redis怎麼使用

Golang分散式應用程式之Redis怎麼使用

王林
王林轉載
2023-05-26 22:07:36852瀏覽

    正文

    Redis作是高效能的記憶體資料庫,常被應用於分散式系統中,除了作為分散式快取或簡單的記憶體資料庫還有一些特殊的應用場景,本文結合Golang來寫對應的中間件。

    分散式鎖定

    單機系統中我們可以使用sync.Mutex來保護臨界資源,在分散式系統中同樣有這樣的需求,當多個主機搶佔同一個資源,需要加對應的「分散式鎖」。

    在Redis中我們可以透過setnx指令來實現

    • #如果key不存在可以設定對應的值,設定成功則加鎖成功,key不存在返回失敗

    • 釋放鎖定可以透過del實現。

    主要邏輯如下:

    type RedisLock struct {
    	client     *redis.Client
    	key        string
    	expiration time.Duration // 过期时间,防止宕机或者异常
    }
    func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock {
    	return &RedisLock{
    		client:     client,
    		key:        key,
    		expiration: expiration,
    	}
    }
    // 加锁将成功会将调用者id保存到redis中
    func (l *RedisLock) Lock(id string) (bool, error) {
    	return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()
    }
    const unLockScript = `
    if (redis.call("get", KEYS[1]) == KEYS[2]) then
    	redis.call("del", KEYS[1])
    	return true
    end
    return false
    `
    // 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁
    func (l *RedisLock) UnLock(id string) error {
    	_, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }

    為了防止系統宕機或例外請求導致的死鎖,需要增加一個額外的逾時時間,該逾時時間應設為最大估計運行時間的兩倍。

    解鎖時透過lua腳本來保證原子性,呼叫者只會解自己加的鎖。避免因逾時造成的混亂,例如:進程A在時間t1獲取了鎖,但由於執行緩慢,在時間t2鎖超時失效,進程B在t3獲取了鎖,這是如果進程A執行完去解鎖會取消進程B的鎖。

    運行測試

    func main() {
        client := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	lock := NewLock(client, "counter", 30*time.Second)
        counter := 0
    	worker := func(i int) {
    		for {
    			id := fmt.Sprintf("worker%d", i)
    			ok, err := lock.Lock(id)
    			log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
    			if !ok {
    				time.Sleep(100 * time.Millisecond)
    				continue
    			}
    			defer lock.UnLock(id)
    			counter++
    			log.Printf("worker %d, add counter %d", i, counter)
    			break
    		}
    	}
    	wg := sync.WaitGroup{}
    	for i := 1; i <= 5; i++ {
    		wg.Add(1)
    		id := i
    		go func() {
    			defer wg.Done()
    			worker(id)
    		}()
    	}
    	wg.Wait()
    }

    運行結果,可以看到與sync.Mutex使用效果類似

    2022/07/22 09: 58:09 worker 5 attempt to obtain lock, ok: true, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:09 worker 5, 加碼 counter 1
    #2022/07/22 09:58 :09 worker 4 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620 ##2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:09 worker 3 attempt to obtain , ok : false, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58 :10 worker 1 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620 ##2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 4, add counter 2
    2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 1, add counter 3
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false , err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 2, add counter 4
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: 2d77b2345c34a631c3d251f57ce68620
    2022/07/22 09:58:10 worker 3, add counter 5

    特別注意的是,在分散式Redis叢集中,如果發生異常時(主節點宕機),可能會降低分散式鎖的可用性,可以透過強一致性的元件etcd、ZooKeeper等實作。

    分散式過濾器

    假設要開發一個爬蟲服務,爬取百萬級的網頁,怎麼判斷某一個網頁是否爬取過,除了借助資料庫和HashMap,我們可以藉助布隆過濾器來做。相對於其他方法,布隆過濾器佔用空間非常少,且插入和查詢時間非常快。

    布隆過濾器用來判斷某個元素是否在集合中,利用BitSet

      插入資料時將值進行多次Hash,將BitSet對應位置1
    • 查詢時同樣進行多次Hash對比所有位上是否為1,如是則存在。
    • 布隆過濾器有一定的誤判率,不適合精確查詢的場景。另外也不支援刪除元素。通常適用於URL去重、垃圾郵件過濾、防止快取擊穿等場景。

    在Redis中,我們可以使用自帶的BitSet實現,同樣也藉助lua腳本的原子性來避免多次查詢資料不一致。

    const (
    	// 插入数据,调用setbit设置对应位
    	setScript = `
    for _, offset in ipairs(ARGV) do
    	redis.call("setbit", KEYS[1], offset, 1)
    end
    `
    	// 查询数据,如果所有位都为1返回true
    	getScript = `
    for _, offset in ipairs(ARGV) do
    	if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
    		return false
    	end
    end
    return true
    `
    )
    type BloomFilter struct {
    	client *redis.Client
    	key    string // 存在redis中的key
    	bits   uint // BitSet的大小
    	maps   uint // Hash的次数
    }
    func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter {
    	client.Del(context.TODO(), key)
    	if maps == 0 {
    		maps = 14
    	}
    	return &BloomFilter{
    		key:    key,
    		client: client,
    		bits:   bits,
    		maps:   maps,
    	}
    }
    // 进行多次Hash, 得到位置列表
    func (f *BloomFilter) getLocations(data []byte) []uint {
    	locations := make([]uint, f.maps)
    	for i := 0; i < int(f.maps); i++ {
    		val := murmur3.Sum64(append(data, byte(i)))
    		locations[i] = uint(val) % f.bits
    	}
    	return locations
    }
    func (f *BloomFilter) Add(data []byte) error {
    	args := getArgs(f.getLocations(data))
    	_, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }
    func (f *BloomFilter) Exists(data []byte) (bool, error) {
    	args := getArgs(f.getLocations(data))
    	resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result()
    	if err != nil {
    		if err == redis.Nil {
    			return false, nil
    		}
    		return false, err
    	}
    	exists, ok := resp.(int64)
    	if !ok {
    		return false, nil
    	}
    	return exists == 1, nil
    }
    func getArgs(locations []uint) []string {
    	args := make([]string, 0)
    	for _, l := range locations {
    		args = append(args, strconv.FormatUint(uint64(l), 10))
    	}
    	return args
    }

    執行測試

    func main() {
    	bf := NewBloomFilter(client,"bf-test", 2^16, 14)
    	exists, err := bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	if err := bf.Add([]byte("test1")); err != nil {
    		log.Printf("add err: %v", err)
    	}
    	exists, err = bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	exists, err = bf.Exists([]byte("test2"))
    	log.Printf("exist %t, err %v", exists, err)
    // output
    // 2022/07/22 10:05:58 exist false, err <nil>
    // 2022/07/22 10:05:58 exist true, err <nil>
    // 2022/07/22 10:05:58 exist false, err <nil>
    }

    分散式限流器

    golang.org/x/time/rate

    套件中提供了基於令牌桶的限流器,如果要實現分散式環境的限流可以基於Redis Lua腳本實現。 令牌桶的主要原理如下:

      假設一個令牌桶容量為burst,每秒依照qps的速率往裡面放置令牌
    • 初始時放滿令牌,令牌溢出則直接丟棄,請求令牌時,如果桶中有足夠令牌則允許,否則拒絕
    • 當burst==qps時,嚴格依照qps限流;當burst>qps時,可以允許一定的突增流量

    这里主要参考了官方rate包的实现,将核心逻辑改为Lua实现。

    --- 相关Key
    --- limit rate key值,对应value为当前令牌数
    local limit_key = KEYS[1]
    --- 输入参数
    --[[
    qps: 每秒请求数;
    burst: 令牌桶容量;
    now: 当前Timestamp;
    cost: 请求令牌数;
    max_wait: 最大等待时间
    --]]
    local qps = tonumber(ARGV[1])
    local burst = tonumber(ARGV[2])
    local now = ARGV[3]
    local cost = tonumber(ARGV[4])
    local max_wait = tonumber(ARGV[5])
    --- 获取redis中的令牌数
    local tokens = redis.call("hget", limit_key, "token")
    if not tokens then
    	tokens = burst
    end
    --- 上次修改时间
    local last_time = redis.call("hget", limit_key, "last_time")
    if not last_time then
    	last_time = 0
    end
    --- 最新等待时间
    local last_event = redis.call("hget", limit_key, "last_event")
    if not last_event then
    	last_event = 0
    end
    --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数
    local delta = math.max(0, now-last_time)
    local new_tokens = math.min(burst, delta * qps + tokens)
    new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌
    --- 如果最新令牌数小于0,计算需要等待的时间
    local wait_period = 0
    if new_tokens < 0 and qps > 0 then
    	wait_period = wait_period - new_tokens / qps
    end
    wait_period = math.ceil(wait_period)
    local time_act = now + wait_period --- 满足等待间隔的时间戳
    --- 允许请求有两种情况
    --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求
    --- qps为0时,只要最新令牌数不小于0即可
    local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
    --- 设置对应值
    if ok then
    	redis.call("set", limit_key, new_tokens)
    	redis.call("set", last_time_key, now)
    	redis.call("set", last_event_key, time_act)
    end
    --- 返回列表,{是否允许, 等待时间}
    return {ok, wait_period}

    在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现

    // 调用lua脚本
    func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
    	// ...
    	res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
    	if err != nil && err != redis.Nil {
    		return nil, err
    	}
    	//...
    	return &Reservation{
    		ok:        allow == 1,
    		lim:       lim,
    		tokens:    n,
    		timeToAct: now.Add(time.Duration(wait) * time.Second),
    	}, nil
    }

    运行测试

    func main() {
    	rdb := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
    	if err != nil {
    		log.Fatal(err)
    	}
    	r.Reset()
    	for i := 0; i < 5; i++ {
    		err := r.Wait(context.TODO())
    		log.Printf("worker %d allowed: %v", i, err)
    	}
    }
    // output
    // 2022/07/22 12:50:31 worker 0 allowed: <nil>
    // 2022/07/22 12:50:31 worker 1 allowed: <nil>
    // 2022/07/22 12:50:32 worker 2 allowed: <nil>
    // 2022/07/22 12:50:33 worker 3 allowed: <nil>
    // 2022/07/22 12:50:34 worker 4 allowed: <nil>

    前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。

    其他

    Redis还可用于全局计数、去重以及发布订阅等不同情境。参考Redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。

    以上是Golang分散式應用程式之Redis怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除