首頁  >  文章  >  後端開發  >  GoLang中協程圖文詳解

GoLang中協程圖文詳解

尚
轉載
2019-11-28 14:14:583909瀏覽

GoLang中協程圖文詳解

協程(coroutine)是Go語言中的輕量級執行緒實現,由Go執行時間(runtime)管理。

在一個函數呼叫前加上go關鍵字,這次呼叫就會在一個新的goroutine中並發執行。當被呼叫的函數回傳時,這個goroutine也自動結束。要注意的是,如果這個函數有回傳值,那麼這個回傳值就會被丟棄。

先看下面的範例:

func Add(x, y int) {
    z := x + y
    fmt.Println(z)
}

func main() {
    for i:=0; i<10; i++ {
        go Add(i, i)
    }
}

執行上面的程式碼,會發現螢幕什麼也沒列印出來,程式就退出了。

對於上面的例子,main()函數啟動了10個goroutine,然後返回,這時程式就退出了,而被啟動的執行Add()的goroutine來不及執行。我們想要讓main()函數等待所有goroutine退出後再返回,但如何知道goroutine都退出了呢?這就引出了多個goroutine之間通信的問題。

在工程上,有兩種最常見的並發通訊模型:共享記憶體和訊息。

來看下面的例子,10個goroutine共享了變數counter,每個goroutine執行完成後,將counter值加1.因為10個goroutine是並發執行的,所以我們還引入了鎖,也就是程式碼中的lock變數。在main()函數中,使用for迴圈來不斷檢查counter值,當其值達到10時,表示所有goroutine都執行完畢了,這時main()返回,程式退出。

package main
import (
    "fmt"
    "sync"
    "runtime"
)

var counter int = 0

func Count(lock *sync.Mutex) {
    lock.Lock()
    counter++
    fmt.Println("counter =", counter)
    lock.Unlock()
}


func main() {

    lock := &sync.Mutex{}

    for i:=0; i<10; i++ {
        go Count(lock)
    }

    for {
        lock.Lock()

        c := counter

        lock.Unlock()

        runtime.Gosched()    // 出让时间片

        if c >= 10 {
            break
        }
    }
}

上面的例子,使用了鎖定變數(屬於一種共享記憶體)來同步協程,事實上Go語言主要使用訊息機制(channel)來作為通訊模型。

channel

訊息機制認為每個並發單元是自包含的、獨立的個體,並且都有自己的變量,但在不同並發單元間這些變數不共享。每個並發單元的輸入和輸出只有一種,那就是訊息。

channel是Go語言在語言層級提供的goroutine間的溝通方式,我們可以使用channel在多個goroutine之間傳遞訊息。 channel是進程內的通訊方式,因此透過channel傳遞物件的過程和呼叫函數時的參數傳遞行為比較一致,例如也可以傳遞指標等。
channel是類型相關的,一個channel只能傳遞一種類型的值,這個類型需要在宣告channel時指定。

channel的宣告形式為:

var chanName chan ElementType

舉個例子,宣告一個傳遞int型別的channel:

var ch chan int

使用內建函數make()定義一個channel :

ch := make(chan int)

在channel的用法中,最常見的包括寫入和讀出:

// 将一个数据value写入至channel,这会导致阻塞,直到有其他goroutine从这个channel中读取数据
ch <- value

// 从channel中读取数据,如果channel之前没有写入数据,也会导致阻塞,直到channel中被写入数据为止
value := <-ch

預設情況下,channel的接收和發送都是阻塞的,除非另一端已準備好。

 我們也可以建立一個有緩衝的channel:

c := make(chan int, 1024)

// 从带缓冲的channel中读数据
for i:=range c {
  ...
}

此時,建立一個大小為1024的int類型的channel,即使沒有讀取方,寫入方也可以一直往channel裡寫入,在緩衝區被填完之前都不會阻塞。

可以關閉不再使用的channel:

close(ch)

應該在生產者的地方關閉channel,如果在消費者的地方關閉,容易引起panic; 

在一個已關閉channel 上執行接收操作(

現在利用channel來重寫上面的例子:

func Count(ch chan int) {
    ch <- 1
    fmt.Println("Counting")
}

func main() {

    chs := make([] chan int, 10)

    for i:=0; i<10; i++ {
        chs[i] = make(chan int)
        go Count(chs[i])
    }

    for _, ch := range(chs) {
        <-ch
    }
}

在這個例子中,定義了一個包含10個channel的數組,並且把數組中的每個channel分配給10個不同的goroutine。在每個goroutine完成後,向goroutine寫入一個數據,在這個channel被讀取前,這個操作是阻塞的。

在所有的goroutine啟動完成後,依序從10個channel讀取數據,在對應的channel寫入資料前,這個操作也是阻塞的。這樣,就用channel實現了類似鎖的功能,並保證了所有goroutine完成後main()才回傳。

另外,我們在將一個channel變數傳遞到一個函數時,可以透過將其指定為單向channel變量,從而限制該函數中可以對此channel的操作。

單向channel變數的宣告:

var ch1 chan int      // 普通channel
var ch2 chan <- int    // 只用于写int数据
var ch3 <-chan int    // 只用于读int数据

可以透過型別轉換,將一個channel轉換為單向的:

ch4 := make(chan int)
ch5 := <-chan int(ch4)   // 单向读
ch6 := chan<- int(ch4)  //单向写

單向channel的作用有點類似c中的const關鍵字,用於遵循代碼“最小權限原則”。

例如在一個函數中使用單向讀channel:

func Parse(ch <-chan int) {
    for value := range ch {
        fmt.Println("Parsing value", value) 
    }
}

channel作為一種原生類型,本身也可以透過channel進行傳遞,例如下面這個流式處理結構:

type PipeData struct {
    value int
    handler func(int) int
    next chan int
}

func handle(queue chan *PipeData) {
    for data := range queue {
        data.next <- data.handler(data.value)
    }
}

select

在UNIX中,select()函數用來監控一組描述符,該機制常被用於實現高並發的socket伺服器程式。 Go語言直接在語言層級支援select關鍵字,用於處理非同步IO問題,大致結構如下:

select {
    case <- chan1:
    // 如果chan1成功读到数据
    
    case chan2 <- 1:
    // 如果成功向chan2写入数据

    default:
    // 默认分支
}

select預設是阻塞的,只有當監聽的channel中有發送或接收可以進行時才會運行,當多個channel都準備好的時候,select是隨機的選擇一個執行的。

Go语言没有对channel提供直接的超时处理机制,但我们可以利用select来间接实现,例如:

timeout := make(chan bool, 1)

go func() {
    time.Sleep(1e9)
    timeout <- true
}()

switch {
    case <- ch:
    // 从ch中读取到数据

    case <- timeout:
    // 没有从ch中读取到数据,但从timeout中读取到了数据
}

这样使用select就可以避免永久等待的问题,因为程序会在timeout中获取到一个数据后继续执行,而无论对ch的读取是否还处于等待状态。

并发

早期版本的Go编译器并不能很智能的发现和利用多核的优势,即使在我们的代码中创建了多个goroutine,但实际上所有这些goroutine都允许在同一个CPU上,在一个goroutine得到时间片执行的时候其它goroutine都会处于等待状态。

实现下面的代码可以显式指定编译器将goroutine调度到多个CPU上运行。

import "runtime"...
runtime.GOMAXPROCS(4)

PS:runtime包中有几个处理goroutine的函数,

GoLang中協程圖文詳解

调度

Go调度的几个概念:

M:内核线程;

G:go routine,并发的最小逻辑单元,由程序员创建;

P:处理器,执行G的上下文环境,每个P会维护一个本地的go routine队列;

GoLang中協程圖文詳解

 除了每个P拥有一个本地的go routine队列外,还存在一个全局的go routine队列。

具体调度原理:

1、P的数量在初始化由GOMAXPROCS决定;

2、我们要做的就是添加G;

3、G的数量超出了M的处理能力,且还有空余P的话,runtime就会自动创建新的M;

4、M拿到P后才能干活,取G的顺序:本地队列>全局队列>其他P的队列,如果所有队列都没有可用的G,M会归还P并进入休眠;

一个G如果发生阻塞等事件会进行阻塞,如下图:

GoLang中協程圖文詳解

G发生上下文切换条件:

系统调用;

读写channel;

gosched主动放弃,会将G扔进全局队列;

如上图,一个G发生阻塞时,M0让出P,由M1接管其任务队列;当M0执行的阻塞调用返回后,再将G0扔到全局队列,自己则进入睡眠(没有P了无法干活);

以上是GoLang中協程圖文詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:cnblogs.com。如有侵權,請聯絡admin@php.cn刪除