go同步機制有:1、channel,著重並發問題中的資料流動,把流動的資料放到channel中,就能使用channel解決這個並發;2、Sync.Mutex,擁有Lock、Unlock兩個方法,主要實現思想體現在Lock函數中;3、Sync.waitGroup;4、Sync.Once;5、Sync.context;6、Sync.pool;7、atomic包,針對變數進行操作。
本教學操作環境:windows7系統、GO 1.18版本、Dell G3電腦。
Golang的提供的同步機制有sync模組下的Mutex、WaitGroup以及語言本身提供的chan等。
概述
#Golang以如此明顯的方式告訴我們:。
優點:channel的核心是資料流動,關注到並發問題中的資料流動,把流動的資料放到channel中,就能使用channel解決這個並發
問題,而且使用channel是線程安全的並且不會有資料衝突,比鎖好用多了
##缺點:不太適應同步太複雜的場景,例如多協程的同步等待問題,而且有死鎖問題 ,channel死鎖問題:死鎖問題連結
分類
channel類型:無緩衝和緩衝類型 channel有兩種形式的,一種是無緩衝的,一個線程向這個channel發送了訊息後,會阻塞當前的這個線程,知道其他線程去接收這個channel的消息。無緩衝的形式如下:
intChan := make(chan int) 带缓冲的channel,是可以指定缓冲的消息数量,当消息数量小于指定值时,不会出现阻塞,超过之后才会阻塞,需要等待其他线程去接收channel处理,带缓冲的形式如下: //3为缓冲数量 intChan := make(chan int, 3)
舉例
type Person struct { Name string Age uint8 Address Addr } type Addr struct { city string district string } /* 测试channel传输复杂的Struct数据 */ func testTranslateStruct() { personChan := make(chan Person, 1) person := Person{"xiaoming", 10, Addr{"shenzhen", "longgang"}} personChan <- person person.Address = Addr{"guangzhou", "huadu"} fmt.Printf("src person : %+v \n", person) newPerson := <-personChan fmt.Printf("new person : %+v \n", newPerson) }在實際應用過程中,等待channel 結束訊號的過程可能不是無期限的,一般會伴隨一個timer ,超時時間如下面所示:
/* 检查channel读写超时,并做超时的处理 */ func testTimeout() { g := make(chan int) quit := make(chan bool) go func() { for { select { case v := <-g: fmt.Println(v) case <-time.After(time.Second * time.Duration(3)): quit <- true fmt.Println("超时,通知主线程退出") return } } }() for i := 0; i < 3; i++ { g <- i } <-quit fmt.Println("收到退出通知,主线程退出") }
func coordinateWithChan() { sign := make(chan struct{}, 2) num := int32(0) fmt.Printf("The number: %d [with chan struct{}]\n", num) max := int32(10) go addNum(&num, 1, max, func() { sign <- struct{}{} }) go addNum(&num, 2, max, func() { sign <- struct{}{} }) <-sign <-sign }
所以Sync.waitGroup 就顯得更優雅,Sync.waitGroup 用來等待一組goroutines的結束,在主Goroutine裡聲明,並且設置要等待的goroutine的個數,每個goroutine執行完成之後呼叫Done,最後在主Goroutines 裡Wait即可。類似於JAVA中的CountDownLatch或循環屏障,並且Sync.waitGroup可以被重複使用,提供瞭如下API:
func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait()
但是Sync.waitGroup的使用需要遵循一些規則,避免拋出Panic:
a. 錯誤調用Done方法,導致waitGroup內部計數值出現負數的情況
b.錯誤的調用Add方法,在waitGroup內部計數值到達0的時候,Add方法被調用,導致應該被喚起的goroutine沒有被喚起,就開始了新的一輪計數週期
所以在調用的時候,就要遵循一下原則:
先統一Add,再並發Done,最後Wait4. Sync.Once
使用:
func main() { var once sync.Once onceBody := func() { time.Sleep(3e9) fmt.Println("Only once") } done := make(chan bool) for i := 0; i < 10; i++ { j := i go func(int) { once.Do(onceBody) fmt.Println(j) done <- true }(j) } //给一部分时间保证能够输出完整【方法一】 //for i := 0; i < 10; i++ { // <-done //} //给一部分时间保证能够输出完整【方法二】 <-done time.Sleep(3e9) }5. Sync.context
當需要進行多批次的計算任務同步,或是需要一對多的協作流程的時候
#使用範例func coordinateWithContext() {
total := 12
var num int32
fmt.Printf("The number: %d [with context.Context]\n", num)
cxt, cancelFunc := context.WithCancel(context.Background())
for i := 1; i <= total; i++ {
go addNum(&num, i, func() {
if atomic.LoadInt32(&num) == int32(total) {
cancelFunc()
}
})
}
<-cxt.Done()
fmt.Println("End.")
}
# a.如何生成自己的context 通过WithCancel、WithDeadline、WithTimeout和WithValue四个方法从context.Background中派生出自己的子context 注意context.background这个上下文根节点仅仅是一个最基本的支点,它不提供任何额外的功能,也就是说,它既不可以被撤销(cancel),也不能携带任何数据,在使用是必须通过以上4种方法派生出自己的context b.子context是会继承父context的值 c.撤销消息的传播 撤销消息会按照深度遍历的方式传播给子context(注意因为多routine调用的原因,最终的撤销顺序可能不会是深度遍历的顺序) ,在遍历的过程中,通过WithCancel、WithDeadline、WithTimeout派生的context会被撤销,但是通过WithValue方法派生的context不会被撤销 我们调用sync/atomic中的几个函数可以对几种简单的类型进行原子操作。这些类型包括int32,int64,uint32,uint64,uintptr,unsafe.Pointer,共6个。这些函数的原子操作共有5种:增或减,比较并交换、载入、存储和交换它们提供了不同的功能,切使用的场景也有区别。 增或减 顾名思义,原子增或减即可实现对被操作值的增大或减少。因此该操作只能操作数值类型。 被用于进行增或减的原子操作都是以“Add”为前缀,并后面跟针对具体类型的名称。 增 栗子:(在原来的基础上加n) 减 栗子:(在原来的基础上加n(n为负数)) 比较并交换 比较并交换----Compare And Swap 简称CAS 他是假设被操作的值未曾被改变(即与旧值相等),并一旦确定这个假设的真实性就立即进行值替换 如果想安全的并发一些类型的值,我们总是应该优先使用CAS 栗子:(如果addr和old相同,就用new代替addr) 载入 如果一个写操作未完成,有一个读操作就已经发生了,这样读操作使很糟糕的。 为了原子的读取某个值sync/atomic代码包同样为我们提供了一系列的函数。这些函数都以"Load"为前缀,意为载入。 栗子 存储 与读操作对应的是写入操作,sync/atomic也提供了与原子的值载入函数相对应的原子的值存储函数。这些函数的名称均以“Store”为前缀 在原子的存储某个值的过程中,任何cpu都不会进行针对进行同一个值的读或写操作。如果我们把所有针对此值的写操作都改为原子操作,那么就不会出现针对此值的读操作读操作因被并发的进行而读到修改了一半的情况。 原子操作总会成功,因为他不必关心被操作值的旧值是什么。 栗子 交换 原子交换操作,这类函数的名称都以“Swap”为前缀。 与CAS不同,交换操作直接赋予新值,不管旧值。 会返回旧值 栗子 1. 什么是Sync包? Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication. Values containing the types defined in this package should not be copied. 这句话大意是说: 从描述中可以看到的是,golang 并不推荐这个包中的大多数并发控制方法,但还是提供了相关方法,主要原因是golang中提倡以共享内存的方式来通信: 不要以共享内存的方式来通信,作为替代,我们应该以通信的手段来共享内存 共享内存的方式使得多线程中的通信变得简单,但是在并发的安全性控制上将变得异常繁琐。 2. 包中的Type 包中主要有: Locker, Cond, Map, Mutex, Once, Pool, 3. 什么是锁,为什么需要锁? 锁是sync包中的核心,他主要有两个方法,加锁和解锁。 可以看到的是,A地址的数字被执行了两次自增,若A=5,我们在执行完成后预期的A值是7,但是在这种情况下我们得到的A却是6,bug了~ 4 写更优雅的代码 在很多语言中我们经常为了保证数据安全正确,会在并发的时候对数据加锁 Golang在此包中也提供了相关的锁,但是标明了"most are intended for use by low-level library routines" 所以我这里只对 Once and WaitGroup types做简述。 5.Once 对象 Once 是一个可以被多次调用但是只执行一次,若每次调用Do时传入参数f不同,但是只有第一个才会被执行。 如果你执行这段代码会发现,虽然调用了10次,但是只执行了1次。BTW:这个东西可以用来写单例。 6. WaitGroup 下面是个官方的例子: 7. 简述 Golang中高级的并发可以通过channel来实现,这是golang所倡导的,但是go也提供了锁等先关操作。6. Sync.pool
7.atomic包,针对变量进行操作
//方法源码
func AddUint32(addr *uint32, delta uint32) (new uint32)
atomic.AddUint32(&addr,n)
atomic.AddUint32(*addr,uint32(int32(n)))
//或
atomic.AddUint32(&addr,^uint32(-n-1))
//方法源码
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
ok:=atomic.CompareAndSwapInt32(&addr,old,new)
//方法源码
func LoadInt32(addr *int32) (val int32)
fun addValue(delta int32){
for{
v:=atomic.LoadInt32(&addr)
if atomic.CompareAndSwapInt32(&v,addr,(delta+v)){
break;
}
}
}
//方法源码
func StoreInt32(addr *int32, val int32)
atomic.StoreInt32(被操作值的指针,新值)
atomic.StoreInt32(&value,newaddr)
//方法源码
func SwapInt32(addr *int32, new int32) (old int32)
atomic.SwapInt32(被操作值的指针,新值)(返回旧值)
oldval:=atomic.StoreInt32(&value,newaddr)
扩展知识:Sync包简述
Sync包同步提供基本的同步原语,如互斥锁。 除了Once和WaitGroup类型之外,大多数类型都是供低级库例程使用的。 通过Channel和沟通可以更好地完成更高级别的同步。并且此包中的值在使用过后不要拷贝。
正确性不是我们唯一想要的,我们想要的还有系统的可伸缩性,以及可理解性,我觉得这点非常重要,比如现在广泛使用的Raft算法。
RWMutex, WaitGrouptype Locker interface {
Lock()
Unlock()
}
type Cond struct {
// L is held while observing or changing the condition
L Locker
}
在单线程运行的时候程序是顺序执行的,程序对数据的访问也是:
读取 => 一顿操作(加减乘除之类的) => 写回原地址
但是一旦程序中进行了并发编程,也就是说,某一个函数可能同时被不同的线程执行的时候,以时间为维度会发生以下情况:
还有很多类似的并发错误,所以才有锁的引入。若是我们在线程2读取A的值的时候对A进行加锁,让线程2等待,线程1执行完成之后在执行线程2,这样就能够保证数据的正确性。但是正确性不是我们唯一想要的。Lock()
doSomething()
Unlock()
func (o *Once) Do(f func())
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
以上是go語言同步機制有哪些的詳細內容。更多資訊請關注PHP中文網其他相關文章!