Heim >Backend-Entwicklung >Golang >Synchronisierungsprobleme in diesem Golang-Programm
PHP-Editor Apple stellt Ihnen heute ein interessantes Thema vor – „Synchronisierungsprobleme in diesem Golang-Programm“. Beim Schreiben gleichzeitiger Programme stoßen wir häufig auf Synchronisationsprobleme, dh auf Konkurrenz und Koordination zwischen mehreren Threads. Als nebenläufige Programmiersprache bietet Golang eine Fülle von Synchronisationsmechanismen und -tools, aber es gibt auch einige häufige Synchronisationsprobleme, die unsere Aufmerksamkeit und Lösungen erfordern. In diesem Artikel werden diese Probleme im Detail untersucht und entsprechende Lösungen vorgestellt, damit jeder die Synchronisierungsherausforderungen in Golang besser verstehen und bewältigen kann. Lasst uns gemeinsam erkunden!
Ich versuche ein Programm zu erstellen, das als Proxyserver fungiert und dynamisch zu neuen Endpunkten wechseln kann. Aber ich bin auf ein Problem gestoßen, als ich switchovertonewendpoint()
后,仍然有一些代理对象连接到原始端点 8.8.8.8
anrief, sollte es ausgeschaltet sein.
package main import ( "net" "sync" "sync/atomic" "time" ) type proxy struct { id int32 from, to *net.tcpconn } var switchover int32 = 0 func setswitchover() { atomic.storeint32((*int32)(&switchover), 1) } func switchoverenabled() bool { return atomic.loadint32((*int32)(&switchover)) == 1 } var proxies map[int32]*proxy = make(map[int32]*proxy, 0) var proxyseq int32 = 0 var mu sync.rwmutex func addproxy(from *net.tcpconn) { mu.lock() proxyseq += 1 proxy := &proxy{id: proxyseq, from: from} proxies[proxyseq] = proxy mu.unlock() var toaddr string if switchoverenabled() { toaddr = "1.1.1.1" } else { toaddr = "8.8.8.8" } tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr) toconn, err := net.dialtcp("tcp", nil, tcpaddr) if err != nil { panic(err) } proxy.to = toconn } func switchovertonewendpoint() { mu.rlock() closedproxies := proxies mu.runlock() setswitchover() for _, proxy := range closedproxies { proxy.from.close() proxy.to.close() mu.lock() delete(proxies, proxy.id) mu.unlock() } } func main() { tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { time.sleep(time.second * 30) switchovertonewendpoint() }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go addproxy(clientconn) } }
Nachdem ich eine Weile darüber nachgedacht habe, vermute ich, dass das Problem
istmu.rlock() closedproxies := proxies mu.runlock()
Aber ich bin mir nicht sicher, ob dies die Grundursache ist und ob sie durch Folgendes behoben werden kann:
closedProxies := make([]*Proxy, 0) mu.RLock() for _, proxy := range proxies { closedProxies = append(closedProxies, proxy) } mu.RUnlock()
Können Fachleute Ideen oder Tipps geben, da dieser Fall schwer zu reproduzieren ist? Alle Kommentare sind willkommen. Dank im Voraus.
Veränderung ist notwendig. In der ursprünglichen Implementierung hatte latedproxies
die gleiche Zuordnung. Sehen Sie sich diese Demo an:
package main import "fmt" func main() { proxies := make(map[int]int, 0) for i := 0; i < 10; i++ { proxies[i] = i } closeproxies := proxies proxies[10] = 10 proxies[11] = 11 for k := range closeproxies { delete(proxies, k) } fmt.printf("items left: %d\n", len(proxies)) // output: // items left: 0 }
Aber das ist nicht die Grundursache. Kopierbar closeproxies
之后但在调用 setswitchover
之前添加新代理。在这种情况下,新代理连接到旧地址,但不在 closeproxies
. Ich denke, das ist die Grundursache.
Noch eine Frage. Schließen Sie diesen Proxy, bevor Sie das to
字段之前,将向 proxies
添加新代理。程序可能希望在设置 to
-Feld festlegen, was zu einer Panik führt.
Die Idee besteht darin, alle Endpunkte in einem Slice zusammenzufassen und jeden Endpunkt seine eigene Proxy-Liste verwalten zu lassen. Wir müssen also nur den Index des aktuellen Endpunkts im Auge behalten. Wenn wir zu einem anderen Endpunkt wechseln möchten, ändern wir einfach den Index und weisen den veralteten Endpunkt an, seinen Proxy zu löschen. Die einzige verbleibende Komplikation besteht darin, sicherzustellen, dass veraltete Endpunkte alle ihre Proxys löschen. Siehe Implementierung unten:
Dies ist die Verwirklichung dieser Idee.
package main import ( "sync" ) // conn is abstraction of a connection to make manager easy to test. type conn interface { close() error } // dialer is abstraction of a dialer to make manager easy to test. type dialer interface { dial(addr string) (conn, error) } type manager struct { // mucurrent protects the "current" member. mucurrent sync.rwmutex current int // when current is -1, the manager is shuted down. endpoints []*endpoint // mu protects the whole switch action. mu sync.mutex } func newmanager(dialer dialer, addresses ...string) *manager { if len(addresses) < 2 { panic("a manger should handle at least 2 addresses") } endpoints := make([]*endpoint, len(addresses)) for i, addr := range addresses { endpoints[i] = &endpoint{ address: addr, dialer: dialer, } } return &manager{ endpoints: endpoints, } } func (m *manager) addproxy(from conn) { // 1. addproxy will wait when the write lock of m.mucurrent is taken. // once the write lock is released, addproxy will connect to the new endpoint. // switch only holds the write lock for a short time, and switch is called // not so frequently, so addproxy won't wait too much. // 2. switch will wait if there is any addproxy holding the read lock of // m.mucurrent. that means switch waits longer. the advantage is that when // e.clear is called in switch, all addproxy requests to the old endpoint // are done. so it's safe to call e.clear then. m.mucurrent.rlock() defer m.mucurrent.runlock() current := m.current // do not accept any new connection when m has been shutdown. if current == -1 { from.close() return } m.endpoints[current].addproxy(from) } func (m *manager) switch() { // in a real world, switch is called not so frequently. // so it's ok to add a lock here. // and it's necessary to make sure the old endpoint is cleared and ready // for use in the future. m.mu.lock() defer m.mu.unlock() // take the write lock of m.mucurrent. // it waits for all the addproxy requests holding the read lock to finish. m.mucurrent.lock() old := m.current // do nothing when m has been shutdown. if old == -1 { m.mucurrent.unlock() return } next := old + 1 if next >= len(m.endpoints) { next = 0 } m.current = next m.mucurrent.unlock() // when it reaches here, all addproxy requests to the old endpoint are done. // and it's safe to call e.clear now. m.endpoints[old].clear() } func (m *manager) shutdown() { m.mu.lock() defer m.mu.unlock() m.mucurrent.lock() current := m.current m.current = -1 m.mucurrent.unlock() m.endpoints[current].clear() } type proxy struct { from, to conn } type endpoint struct { address string dialer dialer mu sync.mutex proxies []*proxy } func (e *endpoint) clear() { for _, p := range e.proxies { p.from.close() p.to.close() } // assign a new slice to e.proxies, and the gc will collect the old one. e.proxies = []*proxy{} } func (e *endpoint) addproxy(from conn) { toconn, err := e.dialer.dial(e.address) if err != nil { // close the from connection so that the client will reconnect? from.close() return } e.mu.lock() defer e.mu.unlock() e.proxies = append(e.proxies, &proxy{from: from, to: toconn}) }
Diese Demo zeigt, wie Sie den zuvor implementierten ManagerTyp:
verwendenpackage main import ( "net" "time" ) type realdialer struct{} func (d realdialer) dial(addr string) (conn, error) { tcpaddr, err := net.resolvetcpaddr("tcp4", addr) if err != nil { return nil, err } return net.dialtcp("tcp", nil, tcpaddr) } func main() { manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8") tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { for range time.tick(30 * time.second) { manager.switch() } }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go manager.addproxy(clientconn) } }
Führen Sie den Test mit dem folgenden Befehl aus: go test ./... -race -count 10
package main import ( "errors" "math/rand" "sync" "sync/atomic" "testing" "time" "github.com/google/uuid" ) func TestManager(t *testing.T) { addresses := []string{"1.1.1.1", "8.8.8.8"} dialer := newDialer(addresses...) manager := NewManager(dialer, addresses...) ch := make(chan int, 1) var wg sync.WaitGroup wg.Add(1) go func() { for range ch { manager.Switch() } wg.Done() }() count := 1000 total := count * 10 wg.Add(total) fromConn := &fakeFromConn{} for i := 0; i < total; i++ { if i%count == count-1 { ch <- 0 } go func() { manager.AddProxy(fromConn) wg.Done() }() } close(ch) wg.Wait() manager.Shutdown() for _, s := range dialer.servers { left := len(s.conns) if left != 0 { t.Errorf("server %s, unexpected connections left: %d", s.addr, left) } } closedCount := fromConn.closedCount.Load() if closedCount != int32(total) { t.Errorf("want closed count: %d, got: %d", total, closedCount) } } type fakeFromConn struct { closedCount atomic.Int32 } func (c *fakeFromConn) Close() error { c.closedCount.Add(1) return nil } type fakeToConn struct { id uuid.UUID server *fakeServer } func (c *fakeToConn) Close() error { if c.id == uuid.Nil { return nil } c.server.removeConn(c.id) return nil } type fakeServer struct { addr string mu sync.Mutex conns map[uuid.UUID]bool } func (s *fakeServer) addConn() (uuid.UUID, error) { s.mu.Lock() defer s.mu.Unlock() id, err := uuid.NewRandom() if err == nil { s.conns[id] = true } return id, err } func (s *fakeServer) removeConn(id uuid.UUID) { s.mu.Lock() defer s.mu.Unlock() delete(s.conns, id) } type fakeDialer struct { servers map[string]*fakeServer } func newDialer(addresses ...string) *fakeDialer { servers := make(map[string]*fakeServer) for _, addr := range addresses { servers[addr] = &fakeServer{ addr: addr, conns: make(map[uuid.UUID]bool), } } return &fakeDialer{ servers: servers, } } func (d *fakeDialer) Dial(addr string) (Conn, error) { n := rand.Intn(100) if n == 0 { return nil, errors.New("fake network error") } // Simulate network latency. time.Sleep(time.Duration(n) * time.Millisecond) s := d.servers[addr] id, err := s.addConn() if err != nil { return nil, err } conn := &fakeToConn{ id: id, server: s, } return conn, nil }
Das obige ist der detaillierte Inhalt vonSynchronisierungsprobleme in diesem Golang-Programm. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!