Rumah >pembangunan bahagian belakang >Golang >Isu penyegerakan dalam program Golang ini
Editor PHP Apple akan memperkenalkan topik menarik kepada anda hari ini - "Isu penyegerakan dalam program Golang ini". Apabila menulis atur cara serentak, kita sering menghadapi masalah penyegerakan, iaitu persaingan dan penyelarasan antara berbilang benang. Golang, sebagai bahasa pengaturcaraan serentak, menyediakan pelbagai mekanisme dan alatan penyegerakan, tetapi terdapat juga beberapa masalah penyegerakan biasa yang perlu kita perhatikan dan selesaikan. Artikel ini akan meneroka isu ini secara terperinci dan memberikan penyelesaian yang sepadan untuk membantu semua orang memahami dengan lebih baik dan menangani cabaran penyegerakan di Golang. Mari kita meneroka bersama-sama!
Saya cuba mencipta program yang bertindak sebagai pelayan proksi dan boleh bertukar secara dinamik ke titik akhir baharu. Tetapi saya menghadapi masalah, apabila memanggil switchovertonewendpoint()
后,仍然有一些代理对象连接到原始端点 8.8.8.8
, ia harus dimatikan.
package main import ( "net" "sync" "sync/atomic" "time" ) type proxy struct { id int32 from, to *net.tcpconn } var switchover int32 = 0 func setswitchover() { atomic.storeint32((*int32)(&switchover), 1) } func switchoverenabled() bool { return atomic.loadint32((*int32)(&switchover)) == 1 } var proxies map[int32]*proxy = make(map[int32]*proxy, 0) var proxyseq int32 = 0 var mu sync.rwmutex func addproxy(from *net.tcpconn) { mu.lock() proxyseq += 1 proxy := &proxy{id: proxyseq, from: from} proxies[proxyseq] = proxy mu.unlock() var toaddr string if switchoverenabled() { toaddr = "1.1.1.1" } else { toaddr = "8.8.8.8" } tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr) toconn, err := net.dialtcp("tcp", nil, tcpaddr) if err != nil { panic(err) } proxy.to = toconn } func switchovertonewendpoint() { mu.rlock() closedproxies := proxies mu.runlock() setswitchover() for _, proxy := range closedproxies { proxy.from.close() proxy.to.close() mu.lock() delete(proxies, proxy.id) mu.unlock() } } func main() { tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { time.sleep(time.second * 30) switchovertonewendpoint() }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go addproxy(clientconn) } }
Setelah difikirkan seketika, saya rasa masalahnya ialah
mu.rlock() closedproxies := proxies mu.runlock()
Tetapi saya tidak pasti sama ada ini puncanya dan adakah ia boleh diperbaiki dengan menggantikannya dengan perkara berikut:
closedProxies := make([]*Proxy, 0) mu.RLock() for _, proxy := range proxies { closedProxies = append(closedProxies, proxy) } mu.RUnlock()
Memandangkan kes ini sukar untuk dihasilkan semula, bolehkah mana-mana profesional memberikan idea atau petua? Sebarang komen dialu-alukan. Terima kasih terlebih dahulu.
Perubahan itu perlu. Dalam pelaksanaan asal, latedproxies
mengadakan pemetaan yang sama. Lihat demo ini:
package main import "fmt" func main() { proxies := make(map[int]int, 0) for i := 0; i < 10; i++ { proxies[i] = i } closeproxies := proxies proxies[10] = 10 proxies[11] = 11 for k := range closeproxies { delete(proxies, k) } fmt.printf("items left: %d\n", len(proxies)) // output: // items left: 0 }
Tetapi ini bukan puncanya. Boleh ditiru closeproxies
之后但在调用 setswitchover
之前添加新代理。在这种情况下,新代理连接到旧地址,但不在 closeproxies
. Saya fikir ini adalah punca utama.
Satu lagi soalan. Tutup proksi ini sebelum menetapkan medan to
字段之前,将向 proxies
添加新代理。程序可能希望在设置 to
, menyebabkan panik.
Ideanya adalah untuk meletakkan semua titik akhir ke dalam kepingan dan biarkan setiap titik akhir mengurus senarai proksinya sendiri. Jadi kita hanya perlu menjejaki indeks titik akhir semasa. Apabila kami ingin menukar ke titik akhir yang lain, kami hanya menukar indeks dan memberitahu titik akhir yang lapuk untuk mengosongkan proksinya. Satu-satunya komplikasi yang tinggal ialah memastikan titik akhir yang lapuk mengosongkan semua proksi mereka. Lihat pelaksanaan di bawah:
Ini adalah realisasi idea ini.
package main import ( "sync" ) // conn is abstraction of a connection to make manager easy to test. type conn interface { close() error } // dialer is abstraction of a dialer to make manager easy to test. type dialer interface { dial(addr string) (conn, error) } type manager struct { // mucurrent protects the "current" member. mucurrent sync.rwmutex current int // when current is -1, the manager is shuted down. endpoints []*endpoint // mu protects the whole switch action. mu sync.mutex } func newmanager(dialer dialer, addresses ...string) *manager { if len(addresses) < 2 { panic("a manger should handle at least 2 addresses") } endpoints := make([]*endpoint, len(addresses)) for i, addr := range addresses { endpoints[i] = &endpoint{ address: addr, dialer: dialer, } } return &manager{ endpoints: endpoints, } } func (m *manager) addproxy(from conn) { // 1. addproxy will wait when the write lock of m.mucurrent is taken. // once the write lock is released, addproxy will connect to the new endpoint. // switch only holds the write lock for a short time, and switch is called // not so frequently, so addproxy won't wait too much. // 2. switch will wait if there is any addproxy holding the read lock of // m.mucurrent. that means switch waits longer. the advantage is that when // e.clear is called in switch, all addproxy requests to the old endpoint // are done. so it's safe to call e.clear then. m.mucurrent.rlock() defer m.mucurrent.runlock() current := m.current // do not accept any new connection when m has been shutdown. if current == -1 { from.close() return } m.endpoints[current].addproxy(from) } func (m *manager) switch() { // in a real world, switch is called not so frequently. // so it's ok to add a lock here. // and it's necessary to make sure the old endpoint is cleared and ready // for use in the future. m.mu.lock() defer m.mu.unlock() // take the write lock of m.mucurrent. // it waits for all the addproxy requests holding the read lock to finish. m.mucurrent.lock() old := m.current // do nothing when m has been shutdown. if old == -1 { m.mucurrent.unlock() return } next := old + 1 if next >= len(m.endpoints) { next = 0 } m.current = next m.mucurrent.unlock() // when it reaches here, all addproxy requests to the old endpoint are done. // and it's safe to call e.clear now. m.endpoints[old].clear() } func (m *manager) shutdown() { m.mu.lock() defer m.mu.unlock() m.mucurrent.lock() current := m.current m.current = -1 m.mucurrent.unlock() m.endpoints[current].clear() } type proxy struct { from, to conn } type endpoint struct { address string dialer dialer mu sync.mutex proxies []*proxy } func (e *endpoint) clear() { for _, p := range e.proxies { p.from.close() p.to.close() } // assign a new slice to e.proxies, and the gc will collect the old one. e.proxies = []*proxy{} } func (e *endpoint) addproxy(from conn) { toconn, err := e.dialer.dial(e.address) if err != nil { // close the from connection so that the client will reconnect? from.close() return } e.mu.lock() defer e.mu.unlock() e.proxies = append(e.proxies, &proxy{from: from, to: toconn}) }
Demo ini menunjukkan cara menggunakan pengurusJenis:
yang telah dilaksanakan sebelum inipackage main import ( "net" "time" ) type realdialer struct{} func (d realdialer) dial(addr string) (conn, error) { tcpaddr, err := net.resolvetcpaddr("tcp4", addr) if err != nil { return nil, err } return net.dialtcp("tcp", nil, tcpaddr) } func main() { manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8") tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { for range time.tick(30 * time.second) { manager.switch() } }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go manager.addproxy(clientconn) } }
Jalankan ujian menggunakan arahan berikut: go test ./... -race -count 10
package main import ( "errors" "math/rand" "sync" "sync/atomic" "testing" "time" "github.com/google/uuid" ) func TestManager(t *testing.T) { addresses := []string{"1.1.1.1", "8.8.8.8"} dialer := newDialer(addresses...) manager := NewManager(dialer, addresses...) ch := make(chan int, 1) var wg sync.WaitGroup wg.Add(1) go func() { for range ch { manager.Switch() } wg.Done() }() count := 1000 total := count * 10 wg.Add(total) fromConn := &fakeFromConn{} for i := 0; i < total; i++ { if i%count == count-1 { ch <- 0 } go func() { manager.AddProxy(fromConn) wg.Done() }() } close(ch) wg.Wait() manager.Shutdown() for _, s := range dialer.servers { left := len(s.conns) if left != 0 { t.Errorf("server %s, unexpected connections left: %d", s.addr, left) } } closedCount := fromConn.closedCount.Load() if closedCount != int32(total) { t.Errorf("want closed count: %d, got: %d", total, closedCount) } } type fakeFromConn struct { closedCount atomic.Int32 } func (c *fakeFromConn) Close() error { c.closedCount.Add(1) return nil } type fakeToConn struct { id uuid.UUID server *fakeServer } func (c *fakeToConn) Close() error { if c.id == uuid.Nil { return nil } c.server.removeConn(c.id) return nil } type fakeServer struct { addr string mu sync.Mutex conns map[uuid.UUID]bool } func (s *fakeServer) addConn() (uuid.UUID, error) { s.mu.Lock() defer s.mu.Unlock() id, err := uuid.NewRandom() if err == nil { s.conns[id] = true } return id, err } func (s *fakeServer) removeConn(id uuid.UUID) { s.mu.Lock() defer s.mu.Unlock() delete(s.conns, id) } type fakeDialer struct { servers map[string]*fakeServer } func newDialer(addresses ...string) *fakeDialer { servers := make(map[string]*fakeServer) for _, addr := range addresses { servers[addr] = &fakeServer{ addr: addr, conns: make(map[uuid.UUID]bool), } } return &fakeDialer{ servers: servers, } } func (d *fakeDialer) Dial(addr string) (Conn, error) { n := rand.Intn(100) if n == 0 { return nil, errors.New("fake network error") } // Simulate network latency. time.Sleep(time.Duration(n) * time.Millisecond) s := d.servers[addr] id, err := s.addConn() if err != nil { return nil, err } conn := &fakeToConn{ id: id, server: s, } return conn, nil }
Atas ialah kandungan terperinci Isu penyegerakan dalam program Golang ini. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!