Rumah > Artikel > pangkalan data > Cara menggunakan Redis dalam aplikasi yang diedarkan Golang
Redis ialah pangkalan data dalam memori berprestasi tinggi yang sering digunakan dalam sistem teragih sebagai tambahan kepada cache teragih atau ringkas memori Pangkalan data juga mempunyai beberapa senario aplikasi khas Artikel ini menggabungkan Golang untuk menulis middleware yang sepadan.
Dalam sistem yang berdiri sendiri, kita boleh menggunakan sync.Mutex
untuk melindungi sumber kritikal Terdapat juga keperluan sedemikian dalam sistem teragih , adalah perlu Tambah "kunci teragih" yang sepadan.
Dalam Redis kita boleh menggunakan perintah setnx
untuk mencapai ini
Jika kunci tidak wujud, anda boleh menetapkan nilai yang sepadan jika tetapan berjaya , kunci akan berjaya. Kunci tidak wujud
melepaskan kunci boleh dicapai melalui del
.
Logik utama adalah seperti berikut:
type RedisLock struct { client *redis.Client key string expiration time.Duration // 过期时间,防止宕机或者异常 } func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock { return &RedisLock{ client: client, key: key, expiration: expiration, } } // 加锁将成功会将调用者id保存到redis中 func (l *RedisLock) Lock(id string) (bool, error) { return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result() } const unLockScript = ` if (redis.call("get", KEYS[1]) == KEYS[2]) then redis.call("del", KEYS[1]) return true end return false ` // 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁 func (l *RedisLock) UnLock(id string) error { _, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result() if err != nil && err != redis.Nil { return err } return nil }
Untuk mengelakkan masa henti sistem atau kebuntuan yang disebabkan oleh permintaan yang tidak normal, tempoh tamat masa tambahan perlu ditambah, yang sepatutnya ditetapkan kepada Dua kali ganda anggaran masa larian maksimum.
Skrip Lua digunakan untuk memastikan atomicity semasa membuka kunci Pemanggil hanya akan membuka kunci yang ditambahkan dengan sendirinya. Elakkan kekeliruan yang disebabkan oleh tamat masa Contohnya: Proses A memperoleh kunci pada masa t1, tetapi disebabkan pelaksanaan yang perlahan, kunci tamat masa pada masa t2. Proses B memperoleh kunci pada t3. proses akan dibatalkan kunci B.
func main() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) lock := NewLock(client, "counter", 30*time.Second) counter := 0 worker := func(i int) { for { id := fmt.Sprintf("worker%d", i) ok, err := lock.Lock(id) log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err) if !ok { time.Sleep(100 * time.Millisecond) continue } defer lock.UnLock(id) counter++ log.Printf("worker %d, add counter %d", i, counter) break } } wg := sync.WaitGroup{} for i := 1; i <= 5; i++ { wg.Add(1) id := i go func() { defer wg.Done() worker(id) }() } wg.Wait() }
Keputusan menunjukkan bahawa kesannya adalah serupa dengan sync.Mutex
2022/07/22 09:58:09 pekerja 5 percubaan untuk mendapatkan kunci, ok: benar, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:09 pekerja 5, tambah kaunter 1
2022/07/22 09:58:09 pekerja 4 cuba untuk dapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:09 pekerja 1 percubaan untuk mendapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/ 22 09:58:09 pekerja 2 percubaan untuk mendapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:09 pekerja 3 percubaan untuk mendapatkan kunci, ok: palsu, err: < ;nil>
2022/07/22 09:58:10 pekerja 3 percubaan untuk mendapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 1 percubaan untuk dapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 2 cuba mendapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/ 22 09:58:10 pekerja 4 percubaan untuk mendapatkan kunci, ok: benar, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 4, tambah kaunter 2
2022/07/22 09:58:10 pekerja 1 percubaan untuk mendapatkan kunci, ok: benar, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 1, tambah kaunter 3
2022/07/22 09 :58:10 pekerja 3 percubaan untuk mendapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 2 cuba mendapatkan kunci, ok: palsu, err:2022/07/22 09:58:10 pekerja 2 cuba mendapatkan kunci, ok: benar, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 2, tambah kaunter 4
2022/07/22 09:58:10 pekerja 3 cuba mendapatkan kunci, ok: palsu, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 3 cuba mendapatkan kunci, ok: benar, err: 2d77b2345c34a631c3d251f57ce68620
2022/07/22 09:58:10 pekerja 3, tambah kaunter 5
Perhatian khusus ialah dalam gugusan Redis yang diedarkan, Jika pengecualian berlaku (nod induk turun), ketersediaan kunci yang diedarkan mungkin dikurangkan, yang boleh dicapai melalui komponen konsistensi yang kuat seperti etcd dan ZooKeeper.
Andaikan kami ingin membangunkan perkhidmatan perangkak untuk merangkak berjuta-juta halaman web Bagaimana untuk menentukan sama ada halaman web tertentu telah dirangkak, selain menggunakan pangkalan data dan HashMap, kami boleh menggunakan penapis Bloom digunakan untuk melakukannya. Berbanding kaedah lain, penapis Bloom mengambil ruang yang sangat sedikit dan mempunyai masa pemasukan dan pertanyaan yang sangat pantas.
Penapis Bloom digunakan untuk menentukan sama ada elemen berada dalam set Gunakan BitSet
untuk mencincang nilai beberapa kali apabila memasukkan data dan menetapkan kedudukan yang sepadan bagi BitSet to 1
Apabila membuat pertanyaan, berbilang operasi Hash juga dilakukan untuk membandingkan sama ada semua bit ialah 1. Jika ya, ia wujud.
Penapis Bloom mempunyai kadar salah penilaian tertentu dan tidak sesuai untuk senario pertanyaan yang tepat. Selain itu, pemadaman elemen tidak disokong. Ia biasanya digunakan dalam senario seperti penyahduplikasian URL, penapisan spam dan pencegahan pecahan cache.
Dalam Redis, kami boleh menggunakan pelaksanaan BitSet terbina dalam, dan juga menggunakan atomicity skrip lua untuk mengelakkan berbilang ketidakkonsistenan data pertanyaan.
const ( // 插入数据,调用setbit设置对应位 setScript = ` for _, offset in ipairs(ARGV) do redis.call("setbit", KEYS[1], offset, 1) end ` // 查询数据,如果所有位都为1返回true getScript = ` for _, offset in ipairs(ARGV) do if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then return false end end return true ` ) type BloomFilter struct { client *redis.Client key string // 存在redis中的key bits uint // BitSet的大小 maps uint // Hash的次数 } func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter { client.Del(context.TODO(), key) if maps == 0 { maps = 14 } return &BloomFilter{ key: key, client: client, bits: bits, maps: maps, } } // 进行多次Hash, 得到位置列表 func (f *BloomFilter) getLocations(data []byte) []uint { locations := make([]uint, f.maps) for i := 0; i < int(f.maps); i++ { val := murmur3.Sum64(append(data, byte(i))) locations[i] = uint(val) % f.bits } return locations } func (f *BloomFilter) Add(data []byte) error { args := getArgs(f.getLocations(data)) _, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result() if err != nil && err != redis.Nil { return err } return nil } func (f *BloomFilter) Exists(data []byte) (bool, error) { args := getArgs(f.getLocations(data)) resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result() if err != nil { if err == redis.Nil { return false, nil } return false, err } exists, ok := resp.(int64) if !ok { return false, nil } return exists == 1, nil } func getArgs(locations []uint) []string { args := make([]string, 0) for _, l := range locations { args = append(args, strconv.FormatUint(uint64(l), 10)) } return args }
func main() { bf := NewBloomFilter(client,"bf-test", 2^16, 14) exists, err := bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) if err := bf.Add([]byte("test1")); err != nil { log.Printf("add err: %v", err) } exists, err = bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) exists, err = bf.Exists([]byte("test2")) log.Printf("exist %t, err %v", exists, err) // output // 2022/07/22 10:05:58 exist false, err <nil> // 2022/07/22 10:05:58 exist true, err <nil> // 2022/07/22 10:05:58 exist false, err <nil> }
Penghad arus berasaskan baldi token disediakan dalam pakej golang.org/x/time/rate
, jika anda ingin melaksanakan persekitaran teragih Pengehadan semasa boleh dilaksanakan berdasarkan skrip Redis Lua.
Prinsip utama baldi token adalah seperti berikut:
Anggapkan bahawa kapasiti baldi token pecah, dan token diletakkan di dalamnya pada kadar qps setiap saat
Pada mulanya, token diisi Jika token melimpah, ia akan dibuang terus apabila meminta token, jika terdapat cukup token dalam baldi, ia akan dibenarkan , jika tidak, ia akan ditolak
Apabila pecah==qps, had aliran benar-benar mengikut qps apabila pecah>qps, letusan trafik tertentu boleh dibenarkan
这里主要参考了官方rate
包的实现,将核心逻辑改为Lua实现。
--- 相关Key --- limit rate key值,对应value为当前令牌数 local limit_key = KEYS[1] --- 输入参数 --[[ qps: 每秒请求数; burst: 令牌桶容量; now: 当前Timestamp; cost: 请求令牌数; max_wait: 最大等待时间 --]] local qps = tonumber(ARGV[1]) local burst = tonumber(ARGV[2]) local now = ARGV[3] local cost = tonumber(ARGV[4]) local max_wait = tonumber(ARGV[5]) --- 获取redis中的令牌数 local tokens = redis.call("hget", limit_key, "token") if not tokens then tokens = burst end --- 上次修改时间 local last_time = redis.call("hget", limit_key, "last_time") if not last_time then last_time = 0 end --- 最新等待时间 local last_event = redis.call("hget", limit_key, "last_event") if not last_event then last_event = 0 end --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数 local delta = math.max(0, now-last_time) local new_tokens = math.min(burst, delta * qps + tokens) new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌 --- 如果最新令牌数小于0,计算需要等待的时间 local wait_period = 0 if new_tokens < 0 and qps > 0 then wait_period = wait_period - new_tokens / qps end wait_period = math.ceil(wait_period) local time_act = now + wait_period --- 满足等待间隔的时间戳 --- 允许请求有两种情况 --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求 --- qps为0时,只要最新令牌数不小于0即可 local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0) --- 设置对应值 if ok then redis.call("set", limit_key, new_tokens) redis.call("set", last_time_key, now) redis.call("set", last_event_key, time_act) end --- 返回列表,{是否允许, 等待时间} return {ok, wait_period}
在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现
// 调用lua脚本 func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) { // ... res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result() if err != nil && err != redis.Nil { return nil, err } //... return &Reservation{ ok: allow == 1, lim: lim, tokens: n, timeToAct: now.Add(time.Duration(wait) * time.Second), }, nil }
func main() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) r, err := NewRedisLimiter(rdb, 1, 2, "testrate") if err != nil { log.Fatal(err) } r.Reset() for i := 0; i < 5; i++ { err := r.Wait(context.TODO()) log.Printf("worker %d allowed: %v", i, err) } } // output // 2022/07/22 12:50:31 worker 0 allowed: <nil> // 2022/07/22 12:50:31 worker 1 allowed: <nil> // 2022/07/22 12:50:32 worker 2 allowed: <nil> // 2022/07/22 12:50:33 worker 3 allowed: <nil> // 2022/07/22 12:50:34 worker 4 allowed: <nil>
前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。
Redis还可用于全局计数、去重以及发布订阅等不同情境。参考Redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。
Atas ialah kandungan terperinci Cara menggunakan Redis dalam aplikasi yang diedarkan Golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!