php editor Apple today introduces an interesting topic to you - "Synchronization issues in this Golang program". When writing concurrent programs, we often encounter synchronization problems, that is, competition and coordination between multiple threads. As a concurrent programming language, Golang provides a wealth of synchronization mechanisms and tools, but there are also some common synchronization issues that require our attention and solutions. This article will explore these issues in detail and give corresponding solutions to help everyone better understand and deal with synchronization challenges in Golang. Let’s explore together!
Question content
I'm trying to create a program that acts as a proxy server and can dynamically switch to a new endpoint. But I encountered a problem, after calling switchovertonewendpoint()
, there are still some proxy objects connected to the original endpoint 8.8.8.8
which should be closed.
package main import ( "net" "sync" "sync/atomic" "time" ) type proxy struct { id int32 from, to *net.tcpconn } var switchover int32 = 0 func setswitchover() { atomic.storeint32((*int32)(&switchover), 1) } func switchoverenabled() bool { return atomic.loadint32((*int32)(&switchover)) == 1 } var proxies map[int32]*proxy = make(map[int32]*proxy, 0) var proxyseq int32 = 0 var mu sync.rwmutex func addproxy(from *net.tcpconn) { mu.lock() proxyseq += 1 proxy := &proxy{id: proxyseq, from: from} proxies[proxyseq] = proxy mu.unlock() var toaddr string if switchoverenabled() { toaddr = "1.1.1.1" } else { toaddr = "8.8.8.8" } tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr) toconn, err := net.dialtcp("tcp", nil, tcpaddr) if err != nil { panic(err) } proxy.to = toconn } func switchovertonewendpoint() { mu.rlock() closedproxies := proxies mu.runlock() setswitchover() for _, proxy := range closedproxies { proxy.from.close() proxy.to.close() mu.lock() delete(proxies, proxy.id) mu.unlock() } } func main() { tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { time.sleep(time.second * 30) switchovertonewendpoint() }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go addproxy(clientconn) } }
After thinking for a while, I guess the problem lies in
mu.rlock() closedproxies := proxies mu.runlock()
But I'm not sure if this is the root cause and if it can be fixed by replacing it with the following:
closedProxies := make([]*Proxy, 0) mu.RLock() for _, proxy := range proxies { closedProxies = append(closedProxies, proxy) } mu.RUnlock()
Since this case is difficult to reproduce, can any professionals provide ideas or tips? Any comments are welcome. Thanks in advance.
Solution
Problem
Change is necessary. In the original implementation, latedproxies
held the same mapping. See this demo:
package main import "fmt" func main() { proxies := make(map[int]int, 0) for i := 0; i < 10; i++ { proxies[i] = i } closeproxies := proxies proxies[10] = 10 proxies[11] = 11 for k := range closeproxies { delete(proxies, k) } fmt.printf("items left: %d\n", len(proxies)) // output: // items left: 0 }
But this is not the root cause. New proxies can be added after copying closeproxies
but before calling setswitchover
. In this case, the new proxy connects to the old address, but is not in closeproxies
. I think this is the root cause.
still have a question. New proxies will be added to proxies
before the to
field is set. The program may want to close this proxy before setting the to
field, causing a panic.
Reliable Design
The idea is to put all endpoints into a slice and let each endpoint manage its own proxy list. So we just need to keep track of the index of the current endpoint. When we want to switch to another endpoint, we just change the index and tell the stale endpoint to clear its proxy. The only remaining complication is ensuring that outdated endpoints clear all of their proxies. See implementation below:
manager.go
This is the realization of this idea.
package main import ( "sync" ) // conn is abstraction of a connection to make manager easy to test. type conn interface { close() error } // dialer is abstraction of a dialer to make manager easy to test. type dialer interface { dial(addr string) (conn, error) } type manager struct { // mucurrent protects the "current" member. mucurrent sync.rwmutex current int // when current is -1, the manager is shuted down. endpoints []*endpoint // mu protects the whole switch action. mu sync.mutex } func newmanager(dialer dialer, addresses ...string) *manager { if len(addresses) < 2 { panic("a manger should handle at least 2 addresses") } endpoints := make([]*endpoint, len(addresses)) for i, addr := range addresses { endpoints[i] = &endpoint{ address: addr, dialer: dialer, } } return &manager{ endpoints: endpoints, } } func (m *manager) addproxy(from conn) { // 1. addproxy will wait when the write lock of m.mucurrent is taken. // once the write lock is released, addproxy will connect to the new endpoint. // switch only holds the write lock for a short time, and switch is called // not so frequently, so addproxy won't wait too much. // 2. switch will wait if there is any addproxy holding the read lock of // m.mucurrent. that means switch waits longer. the advantage is that when // e.clear is called in switch, all addproxy requests to the old endpoint // are done. so it's safe to call e.clear then. m.mucurrent.rlock() defer m.mucurrent.runlock() current := m.current // do not accept any new connection when m has been shutdown. if current == -1 { from.close() return } m.endpoints[current].addproxy(from) } func (m *manager) switch() { // in a real world, switch is called not so frequently. // so it's ok to add a lock here. // and it's necessary to make sure the old endpoint is cleared and ready // for use in the future. m.mu.lock() defer m.mu.unlock() // take the write lock of m.mucurrent. // it waits for all the addproxy requests holding the read lock to finish. m.mucurrent.lock() old := m.current // do nothing when m has been shutdown. if old == -1 { m.mucurrent.unlock() return } next := old + 1 if next >= len(m.endpoints) { next = 0 } m.current = next m.mucurrent.unlock() // when it reaches here, all addproxy requests to the old endpoint are done. // and it's safe to call e.clear now. m.endpoints[old].clear() } func (m *manager) shutdown() { m.mu.lock() defer m.mu.unlock() m.mucurrent.lock() current := m.current m.current = -1 m.mucurrent.unlock() m.endpoints[current].clear() } type proxy struct { from, to conn } type endpoint struct { address string dialer dialer mu sync.mutex proxies []*proxy } func (e *endpoint) clear() { for _, p := range e.proxies { p.from.close() p.to.close() } // assign a new slice to e.proxies, and the gc will collect the old one. e.proxies = []*proxy{} } func (e *endpoint) addproxy(from conn) { toconn, err := e.dialer.dial(e.address) if err != nil { // close the from connection so that the client will reconnect? from.close() return } e.mu.lock() defer e.mu.unlock() e.proxies = append(e.proxies, &proxy{from: from, to: toconn}) }
main.go
This demo shows how to use the previously implemented manager type:
package main import ( "net" "time" ) type realdialer struct{} func (d realdialer) dial(addr string) (conn, error) { tcpaddr, err := net.resolvetcpaddr("tcp4", addr) if err != nil { return nil, err } return net.dialtcp("tcp", nil, tcpaddr) } func main() { manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8") tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { for range time.tick(30 * time.second) { manager.switch() } }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go manager.addproxy(clientconn) } }
manager_test.go
Run the test using the following command: go test ./... -race -count 10
package main import ( "errors" "math/rand" "sync" "sync/atomic" "testing" "time" "github.com/google/uuid" ) func TestManager(t *testing.T) { addresses := []string{"1.1.1.1", "8.8.8.8"} dialer := newDialer(addresses...) manager := NewManager(dialer, addresses...) ch := make(chan int, 1) var wg sync.WaitGroup wg.Add(1) go func() { for range ch { manager.Switch() } wg.Done() }() count := 1000 total := count * 10 wg.Add(total) fromConn := &fakeFromConn{} for i := 0; i < total; i++ { if i%count == count-1 { ch <- 0 } go func() { manager.AddProxy(fromConn) wg.Done() }() } close(ch) wg.Wait() manager.Shutdown() for _, s := range dialer.servers { left := len(s.conns) if left != 0 { t.Errorf("server %s, unexpected connections left: %d", s.addr, left) } } closedCount := fromConn.closedCount.Load() if closedCount != int32(total) { t.Errorf("want closed count: %d, got: %d", total, closedCount) } } type fakeFromConn struct { closedCount atomic.Int32 } func (c *fakeFromConn) Close() error { c.closedCount.Add(1) return nil } type fakeToConn struct { id uuid.UUID server *fakeServer } func (c *fakeToConn) Close() error { if c.id == uuid.Nil { return nil } c.server.removeConn(c.id) return nil } type fakeServer struct { addr string mu sync.Mutex conns map[uuid.UUID]bool } func (s *fakeServer) addConn() (uuid.UUID, error) { s.mu.Lock() defer s.mu.Unlock() id, err := uuid.NewRandom() if err == nil { s.conns[id] = true } return id, err } func (s *fakeServer) removeConn(id uuid.UUID) { s.mu.Lock() defer s.mu.Unlock() delete(s.conns, id) } type fakeDialer struct { servers map[string]*fakeServer } func newDialer(addresses ...string) *fakeDialer { servers := make(map[string]*fakeServer) for _, addr := range addresses { servers[addr] = &fakeServer{ addr: addr, conns: make(map[uuid.UUID]bool), } } return &fakeDialer{ servers: servers, } } func (d *fakeDialer) Dial(addr string) (Conn, error) { n := rand.Intn(100) if n == 0 { return nil, errors.New("fake network error") } // Simulate network latency. time.Sleep(time.Duration(n) * time.Millisecond) s := d.servers[addr] id, err := s.addConn() if err != nil { return nil, err } conn := &fakeToConn{ id: id, server: s, } return conn, nil }
The above is the detailed content of Synchronization issues in this Golang program. For more information, please follow other related articles on the PHP Chinese website!

对于初学者来说,要想在Linux环境下编程,必须深入理解一些重要概念才能更好地编写代码,实现业务功能。下面我们将介绍几个重要且常用的知识点。掌握这些概念可以避免在将来的编码中出现混淆。系统调用“❝所有操作系统的内核中都有一些内置函数,这些函数可以用来完成一些系统级别的功能。在Linux系统中,这些函数被称为“系统调用”(systemcall)。它们代表了从用户空间到内核空间的一种转换。❞”已收到消息.对于初学者来说,要想在Linux环境下编程,必须深入理解一些重要概念才能更好地编写代码,实现业务

进程间通信是指在Linux系统中,不同的进程之间进行数据的传递和共享,以实现进程之间的交流和协作。进程间通信的目的是提高系统的并发性和效率,以完成一些复杂的任务和功能。进程间通信的方法有很多种,如管道、消息队列、信号、共享内存、信号量、套接字等,它们各有各的特点和优缺点,适用于不同的场景和需求。但是,你真的了解Linux进程间通信的方法吗?你知道如何在Linux下使用和选择合适的进程间通信方法吗?你知道如何在Linux下优化和提高进程间通信的效果吗?本文将为你详细介绍Linux进程间通信的相关知

前言同步是进程之间,以及进程与系统资源之间的交互。由于Linux内核采用多任务,因此在多个进程之间必须有同步机制来保证协调。Linux内核中有许多种同步机制。今天我们将重点介绍kernel中的异步和同步机制,其中着重介绍kernel中的异步机制。kernel中的异步机制分为两种:一种是应用层的同步机制,即应用层线程之间的通信;另一种是内核的同步机制。当一个线程进入内核态后,它可以直接与内核沟通。kernel中有两个线程是这样的:一个是线程A,它进入内核态后会直接与内核沟通,告诉它要做什么,完成后

Golang中同步机制对于游戏开发性能的提升,需要具体代码示例引言:游戏开发是一个对性能高要求的领域,在处理实时交互的同时,还要保持游戏的流畅性和稳定性。而Go语言(Golang)则提供了一种高效的编程语言和并发模型,使得其在游戏开发中有着广泛应用的潜力。本文将重点探讨Golang中同步机制对于游戏开发性能的提升,并通过具体代码示例来加深理解。一、Golan

Linux内核是一个复杂的系统,它需要处理多种多样的并发问题,如进程调度、内存管理、设备驱动、网络协议等。为了保证数据的一致性和正确性,Linux内核提供了多种同步机制,如自旋锁、信号量、读写锁等。但是,这些同步机制都有一些缺点,比如:自旋锁会导致CPU浪费时间在忙等待上,而且不能在抢占式内核中使用;信号量会导致进程睡眠和唤醒,增加了上下文切换的开销;读写锁会导致写者饥饿或者读者饥饿,而且在读者多写者少的情况下,写者还要获取锁的开销。那么,有没有一种更好的同步机制呢?答案是有的,那就是RCU(R

Linux系统是一种支持多任务并发执行的操作系统,它可以同时运行多个进程,从而提高系统的利用率和效率。但是,如果一个进程中有多个线程,而这些线程需要共享一些数据或资源,就可能出现数据不一致或资源竞争的问题,导致系统的错误或异常。为了解决这个问题,就需要使用一些同步机制,例如信号量、条件变量、互斥量等。其中,互斥量是一种比较简单而有效的同步机制,它可以让一个线程在访问共享数据或资源时,锁定它们,防止其他线程同时访问,从而保证线程安全。本文将介绍Linux系统中多线程互斥量的互斥的方法,包括互斥量的

Java多线程原理剖析:线程同步与死锁问题分析摘要:本文将深入探讨Java多线程编程中的线程同步和死锁问题。通过详细解释线程的原理和Java提供的同步机制,我们将讨论如何正确地使用同步机制来避免线程冲突和数据不一致的问题。同时,我们还将分析死锁问题以及如何避免和解决这些问题。1.引言随着计算机硬件的发展,多核处理器已经成为现代计算机系统的标配。而多线程编程

下面将讲解进程间通信的另一种方式,使用共享内存。一、什么是共享内存顾名思义,共享内存就是允许两个不相关的进程访问同一个逻辑内存。共享内存是在两个正在运行的进程之间共享和传递数据的一种非常有效的方式。不同进程之间共享的内存通常安排为同一段物理内存。进程可以将同一段共享内存连接到它们自己的地址空间中,所有进程都可以访问共享内存中的地址,就好像它们是由用C语言函数malloc分配的内存一样。而如果某个进程向共享内存写入数据,所做的改动将立即影响到可以访问同一段共享内存的任何其他进程。特别提醒:共享内存


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Safe Exam Browser
Safe Exam Browser is a secure browser environment for taking online exams securely. This software turns any computer into a secure workstation. It controls access to any utility and prevents students from using unauthorized resources.

Notepad++7.3.1
Easy-to-use and free code editor

MantisBT
Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

Dreamweaver CS6
Visual web development tools

ZendStudio 13.5.1 Mac
Powerful PHP integrated development environment
