我有一个程序,基本上有三种情况 - 设置键的值,获取值(如果存在),或者等到给定键的值可用。我最初的想法 - 创建一个带有 map[string]interface{}
的新类型 - 其中存储“持久”值。除此之外,为了等待一个值,我计划使用 map[string](chan struct{})
。当调用 set()
方法时,我会写入该通道,任何等待它的人都会知道该值在那里。
我事先不知道密钥 - 它们是随机的。我不确定如何正确实现 wait()
方法。
type Map struct { sync.Mutex m map[string]interface{} wait map[string]chan (struct{}) } func (m *Map) Set(key string, value interface{}) { m.ensureWaitChan(key) m.Lock() defer m.Unlock() m.m[key] = value // Signal to all waiting. m.wait[key] <- struct{}{} } func (m *Map) Wait(key string) interface{} { m.ensureWaitChan(key) m.Lock() value, ok := m.m[key] if ok { m.Unlock() return value } m.Unlock() // <------ Unlocked state where something might happen. <-m.wait[key] value := m.m[key] return value } // If the channel does not exist for those waiting - create it. func (m *Map) ensureWaitChan(key string) { m.Lock() defer m.Unlock() _, ok := m.wait[key] if ok { return } m.wait[key] = make(chan struct{}, 100) }
问题是 - wait()
中存在竞争条件 - 在我释放互斥体之后,在我开始侦听通道上的传入值之前。
处理这个问题的最佳方法是什么?欢迎任何其他关于如何实现这一点的建议,我相信一定有更好的方法来做到这一点。我不会以固定的时间间隔或类似的方式轮询该值。
您正在寻找的是同步映射和消息代理之间的混合。我们可以通过利用通信和同步通道来实现这一点,以便订阅者可以在消息发布后立即收到消息(如果消息尚未在缓存中)。
type Map struct { sync.Mutex m map[string]any subs map[string][]chan any } func (m *Map) Set(key string, value any) { m.Lock() defer m.Unlock() m.m[key] = value // Send the new value to all waiting subscribers of the key for _, sub := range m.subs[key] { sub <- value } delete(m.subs, key) } func (m *Map) Wait(key string) any { m.Lock() // Unlock cannot be deferred so we can unblock Set() while waiting value, ok := m.m[key] if ok { m.Unlock() return value } // if there is no value yet, subscribe to any new values for this key ch := make(chan any) m.subs[key] = append(m.subs[key], ch) m.Unlock() return <-ch }
由于订阅者在等待时必须解锁地图互斥体,因此他们无法安全地访问添加到地图中的新消息。我们通过自己的频道将新值直接发送给所有订阅者,这样我们就不需要在 set
中添加更多同步,以确保所有订阅者在解锁地图本身之前都满意。提前解锁地图将允许订阅者直接读取它,但也会允许同时插入新值,从而导致结果不一致。
正在运行的版本,还包括带有类型参数的通用 map
实现: https://go.dev /play/p/an7vrspdgmo
以上是等待映射中的值在 Go 中可用的详细内容。更多信息请关注PHP中文网其他相关文章!