Maison >développement back-end >Golang >Problèmes de synchronisation dans ce programme Golang
L'éditeur PHP Apple vous présentera aujourd'hui un sujet intéressant - "Problèmes de synchronisation dans ce programme Golang". Lors de l'écriture de programmes concurrents, nous rencontrons souvent des problèmes de synchronisation, c'est-à-dire de compétition et de coordination entre plusieurs threads. En tant que langage de programmation concurrent, Golang fournit une multitude de mécanismes et d'outils de synchronisation, mais il existe également certains problèmes de synchronisation courants qui nécessitent notre attention et nos solutions. Cet article explorera ces problèmes en détail et proposera les solutions correspondantes pour aider chacun à mieux comprendre et gérer les défis de synchronisation dans Golang. Explorons ensemble !
J'essaie de créer un programme qui agit comme un serveur proxy et peut basculer dynamiquement vers un nouveau point de terminaison. Mais j'ai rencontré un problème, lors de l'appel switchovertonewendpoint()
后,仍然有一些代理对象连接到原始端点 8.8.8.8
, il devrait être éteint.
package main import ( "net" "sync" "sync/atomic" "time" ) type proxy struct { id int32 from, to *net.tcpconn } var switchover int32 = 0 func setswitchover() { atomic.storeint32((*int32)(&switchover), 1) } func switchoverenabled() bool { return atomic.loadint32((*int32)(&switchover)) == 1 } var proxies map[int32]*proxy = make(map[int32]*proxy, 0) var proxyseq int32 = 0 var mu sync.rwmutex func addproxy(from *net.tcpconn) { mu.lock() proxyseq += 1 proxy := &proxy{id: proxyseq, from: from} proxies[proxyseq] = proxy mu.unlock() var toaddr string if switchoverenabled() { toaddr = "1.1.1.1" } else { toaddr = "8.8.8.8" } tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr) toconn, err := net.dialtcp("tcp", nil, tcpaddr) if err != nil { panic(err) } proxy.to = toconn } func switchovertonewendpoint() { mu.rlock() closedproxies := proxies mu.runlock() setswitchover() for _, proxy := range closedproxies { proxy.from.close() proxy.to.close() mu.lock() delete(proxies, proxy.id) mu.unlock() } } func main() { tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { time.sleep(time.second * 30) switchovertonewendpoint() }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go addproxy(clientconn) } }
Après y avoir réfléchi un moment, je suppose que le problème est
mu.rlock() closedproxies := proxies mu.runlock()
Mais je ne sais pas si c'est la cause première et si elle peut être corrigée en la remplaçant par ce qui suit :
closedProxies := make([]*Proxy, 0) mu.RLock() for _, proxy := range proxies { closedProxies = append(closedProxies, proxy) } mu.RUnlock()
Ce cas étant difficile à reproduire, des professionnels peuvent-ils apporter des idées ou des conseils ? Tous les commentaires sont les bienvenus. Merci d'avance.
Le changement est nécessaire. Dans l'implémentation originale, latedproxies
contenait le même mappage. Voir cette démo :
package main import "fmt" func main() { proxies := make(map[int]int, 0) for i := 0; i < 10; i++ { proxies[i] = i } closeproxies := proxies proxies[10] = 10 proxies[11] = 11 for k := range closeproxies { delete(proxies, k) } fmt.printf("items left: %d\n", len(proxies)) // output: // items left: 0 }
Mais ce n’est pas la cause profonde. Peut être copié closeproxies
之后但在调用 setswitchover
之前添加新代理。在这种情况下,新代理连接到旧地址,但不在 closeproxies
. Je pense que c'est la cause profonde.
Encore une question. Fermez ce proxy avant de définir le champ to
字段之前,将向 proxies
添加新代理。程序可能希望在设置 to
, provoquant une panique.
L'idée est de regrouper tous les points de terminaison dans une tranche et de laisser chaque point de terminaison gérer sa propre liste de proxy. Il nous suffit donc de garder une trace de l'index du point de terminaison actuel. Lorsque nous voulons passer à un autre point de terminaison, nous modifions simplement l'index et demandons au point de terminaison obsolète d'effacer son proxy. La seule complication restante consiste à garantir que les points de terminaison obsolètes effacent tous leurs proxys. Voir la mise en œuvre ci-dessous :
C'est la concrétisation de cette idée.
package main import ( "sync" ) // conn is abstraction of a connection to make manager easy to test. type conn interface { close() error } // dialer is abstraction of a dialer to make manager easy to test. type dialer interface { dial(addr string) (conn, error) } type manager struct { // mucurrent protects the "current" member. mucurrent sync.rwmutex current int // when current is -1, the manager is shuted down. endpoints []*endpoint // mu protects the whole switch action. mu sync.mutex } func newmanager(dialer dialer, addresses ...string) *manager { if len(addresses) < 2 { panic("a manger should handle at least 2 addresses") } endpoints := make([]*endpoint, len(addresses)) for i, addr := range addresses { endpoints[i] = &endpoint{ address: addr, dialer: dialer, } } return &manager{ endpoints: endpoints, } } func (m *manager) addproxy(from conn) { // 1. addproxy will wait when the write lock of m.mucurrent is taken. // once the write lock is released, addproxy will connect to the new endpoint. // switch only holds the write lock for a short time, and switch is called // not so frequently, so addproxy won't wait too much. // 2. switch will wait if there is any addproxy holding the read lock of // m.mucurrent. that means switch waits longer. the advantage is that when // e.clear is called in switch, all addproxy requests to the old endpoint // are done. so it's safe to call e.clear then. m.mucurrent.rlock() defer m.mucurrent.runlock() current := m.current // do not accept any new connection when m has been shutdown. if current == -1 { from.close() return } m.endpoints[current].addproxy(from) } func (m *manager) switch() { // in a real world, switch is called not so frequently. // so it's ok to add a lock here. // and it's necessary to make sure the old endpoint is cleared and ready // for use in the future. m.mu.lock() defer m.mu.unlock() // take the write lock of m.mucurrent. // it waits for all the addproxy requests holding the read lock to finish. m.mucurrent.lock() old := m.current // do nothing when m has been shutdown. if old == -1 { m.mucurrent.unlock() return } next := old + 1 if next >= len(m.endpoints) { next = 0 } m.current = next m.mucurrent.unlock() // when it reaches here, all addproxy requests to the old endpoint are done. // and it's safe to call e.clear now. m.endpoints[old].clear() } func (m *manager) shutdown() { m.mu.lock() defer m.mu.unlock() m.mucurrent.lock() current := m.current m.current = -1 m.mucurrent.unlock() m.endpoints[current].clear() } type proxy struct { from, to conn } type endpoint struct { address string dialer dialer mu sync.mutex proxies []*proxy } func (e *endpoint) clear() { for _, p := range e.proxies { p.from.close() p.to.close() } // assign a new slice to e.proxies, and the gc will collect the old one. e.proxies = []*proxy{} } func (e *endpoint) addproxy(from conn) { toconn, err := e.dialer.dial(e.address) if err != nil { // close the from connection so that the client will reconnect? from.close() return } e.mu.lock() defer e.mu.unlock() e.proxies = append(e.proxies, &proxy{from: from, to: toconn}) }
Cette démo montre comment utiliser le managerType :
précédemment implémentépackage main import ( "net" "time" ) type realdialer struct{} func (d realdialer) dial(addr string) (conn, error) { tcpaddr, err := net.resolvetcpaddr("tcp4", addr) if err != nil { return nil, err } return net.dialtcp("tcp", nil, tcpaddr) } func main() { manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8") tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { for range time.tick(30 * time.second) { manager.switch() } }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go manager.addproxy(clientconn) } }
Exécutez le test en utilisant la commande suivante : go test ./... -race -count 10
package main import ( "errors" "math/rand" "sync" "sync/atomic" "testing" "time" "github.com/google/uuid" ) func TestManager(t *testing.T) { addresses := []string{"1.1.1.1", "8.8.8.8"} dialer := newDialer(addresses...) manager := NewManager(dialer, addresses...) ch := make(chan int, 1) var wg sync.WaitGroup wg.Add(1) go func() { for range ch { manager.Switch() } wg.Done() }() count := 1000 total := count * 10 wg.Add(total) fromConn := &fakeFromConn{} for i := 0; i < total; i++ { if i%count == count-1 { ch <- 0 } go func() { manager.AddProxy(fromConn) wg.Done() }() } close(ch) wg.Wait() manager.Shutdown() for _, s := range dialer.servers { left := len(s.conns) if left != 0 { t.Errorf("server %s, unexpected connections left: %d", s.addr, left) } } closedCount := fromConn.closedCount.Load() if closedCount != int32(total) { t.Errorf("want closed count: %d, got: %d", total, closedCount) } } type fakeFromConn struct { closedCount atomic.Int32 } func (c *fakeFromConn) Close() error { c.closedCount.Add(1) return nil } type fakeToConn struct { id uuid.UUID server *fakeServer } func (c *fakeToConn) Close() error { if c.id == uuid.Nil { return nil } c.server.removeConn(c.id) return nil } type fakeServer struct { addr string mu sync.Mutex conns map[uuid.UUID]bool } func (s *fakeServer) addConn() (uuid.UUID, error) { s.mu.Lock() defer s.mu.Unlock() id, err := uuid.NewRandom() if err == nil { s.conns[id] = true } return id, err } func (s *fakeServer) removeConn(id uuid.UUID) { s.mu.Lock() defer s.mu.Unlock() delete(s.conns, id) } type fakeDialer struct { servers map[string]*fakeServer } func newDialer(addresses ...string) *fakeDialer { servers := make(map[string]*fakeServer) for _, addr := range addresses { servers[addr] = &fakeServer{ addr: addr, conns: make(map[uuid.UUID]bool), } } return &fakeDialer{ servers: servers, } } func (d *fakeDialer) Dial(addr string) (Conn, error) { n := rand.Intn(100) if n == 0 { return nil, errors.New("fake network error") } // Simulate network latency. time.Sleep(time.Duration(n) * time.Millisecond) s := d.servers[addr] id, err := s.addConn() if err != nil { return nil, err } conn := &fakeToConn{ id: id, server: s, } return conn, nil }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!