在現代程式語言中,並發已成為不可或缺的特性。現在絕大多數程式語言都有一些方法實現並發。
其中一些實作方式非常強大,能將負載轉移到不同的系統線程,例如 Java 等;有些則在同一線程上模擬這種行為,例如 Ruby 等。
Golang 的並發模型非常強大,稱為 CSP(通訊順序進程),它將一個問題分解成更小的順序進程,然後調度這些進程的實例(稱為 Goroutine)。這些進程透過 channel 傳遞訊息實現通訊。
本文,我們將探討如何利用 golang 的並發性,以及如何在 workerPool 使用。在系列文章的第二篇,我們將探討如何建立一個強大的並發解決方案。
一個簡單的例子
假設我們需要呼叫一個外部 API 接口,整個過程需要花費 100ms。如果我們需要同步地呼叫該介面 1000 次,則需要花費 100s。
//// model/data.go package model type SimpleData struct { ID int } //// basic/basic.go package basic import ( "fmt" "github.com/Joker666/goworkerpool/model" "time" ) func Work(allData []model.SimpleData) { start := time.Now() for i, _ := range allData { Process(allData[i]) } elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func Process(data model.SimpleData) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) fmt.Printf("Finish processing %d\n", data.ID) } //// main.go package main import ( "fmt" "github.com/Joker666/goworkerpool/basic" "github.com/Joker666/goworkerpool/model" "github.com/Joker666/goworkerpool/worker" ) func main() { // Prepare the data var allData []model.SimpleData for i := 0; i < 1000; i++ { data := model.SimpleData{ ID: i } allData = append(allData, data) } fmt.Printf("Start processing all work \n") // Process basic.Work(allData) }
Start processing all work Took ===============> 1m40.226679665s
上面的程式碼建立了 model 包,包裡包含一個結構體,這個結構體只有一個 int 類型的成員。我們同步地處理 data,這顯然不是最佳方案,因為可以並發處理這些任務。我們換一個方案,使用 goroutine 和 channel 來處理。
非同步
//// worker/notPooled.go func NotPooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup dataCh := make(chan model.SimpleData, 100) wg.Add(1) go func() { defer wg.Done() for data := range dataCh { wg.Add(1) go func(data model.SimpleData) { defer wg.Done() basic.Process(data) }(data) } }() for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.NotPooledWork(allData)
Start processing all work Took ===============> 101.191534ms
上面的程式碼,我們創建了容量100 的快取channel,並透過NoPooledWork() 將資料push 到channel 裡。 channel 長度滿 100 之後,我們是無法再向其中添加元素直到有元素被讀取走。使用 for range 讀取 channel,並產生 goroutine 處理。這裡我們沒有限制生成 goroutine 的數量,這可以盡可能地處理任務。從理論上來講,在給定所需資源的情況下,可以處理盡可能多的資料。執行程式碼,完成 1000 個任務只花了 100ms。很瘋狂吧!不全是,接著往下看。
問題
除非我們擁有地球上所有的資源,否則在特定時間內能夠分配的資源是有限的。一個 goroutine 佔用的最小記憶體是 2k,但也能達到 1G。上述並發執行所有任務的解決方案中,假設有一百萬個任務,就會很快耗盡機器的記憶體和 CPU。我們要么升級機器的配置,要么就尋找其他更好的解決方案。
计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。
解决方案:Worker Pool
我们通过实现 worker pool 来修复之前遇到的问题。
//// worker/pooled.go func PooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { basic.Process(data) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.PooledWork(allData)
Start processing all work Took ===============> 1.002972449s
上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。
Go 语言的 channel 可以当作队列使用。
这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。
处理错误
我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。
//// worker/pooledError.go func PooledWorkError(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) errors := make(chan error, 1000) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { process(data, errors) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Add(1) go func() { defer wg.Done() for { select { case err := <-errors: fmt.Println("finished with error:", err.Error()) case <-time.After(time.Second * 1): fmt.Println("Timeout: errors finished") return } } }() defer close(errors) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func process(data model.SimpleData, errors chan<- error) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) if data.ID % 29 == 0 { errors <- fmt.Errorf("error on job %v", data.ID) } else { fmt.Printf("Finish processing %d\n", data.ID) } } //// main.go // Process worker.PooledWorkError(allData)
我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。
比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,
我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。
总结
Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!
以上是Go語言的並發與WorkerPool - 第一部分的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Gooffersrobustfeaturesforsecurecoding,butdevelopersmustimplementsecuritybestpracticeseffectively.1)UseGo'scryptopackageforsecuredatahandling.2)Manageconcurrencywithsynchronizationprimitivestopreventraceconditions.3)SanitizeexternalinputstoavoidSQLinj

Go的錯誤接口定義為typeerrorinterface{Error()string},允許任何實現Error()方法的類型被視為錯誤。使用步驟如下:1.基本檢查和記錄錯誤,例如iferr!=nil{log.Printf("Anerroroccurred:%v",err)return}。 2.創建自定義錯誤類型以提供更多信息,如typeMyErrorstruct{MsgstringDetailstring}。 3.使用錯誤包裝(自Go1.13起)來添加上下文而不丟失原始錯誤信息,

對效率的Handleerrorsinconcurrentgopragrs,UsechannelstocommunicateErrors,enplionErrorWatchers,Instertimeout,UsebufferedChannels和Provideclearrormessages.1)USEchannelelStopassErtopassErrorsErtopassErrorsErrorsErrorsFromGoroutInestOthemainFunction.2)

在Go語言中,接口的實現是通過隱式的方式進行的。 1)隱式實現:類型只要包含接口定義的所有方法,就自動滿足該接口。 2)空接口:interface{}類型所有類型都實現,適度使用可避免類型安全問題。 3)接口隔離:設計小而專注的接口,提高代碼的可維護性和重用性。 4)測試:接口有助於通過模擬依賴進行單元測試。 5)錯誤處理:通過接口可以統一處理錯誤。

go'sinterfacesareimpliclyimplyed,與Javaandc#wheRequireexplitiCimplation.1)Ingo,AnyTypeWithTheRequiredMethodSautSautSautautapitymethodimimplementsaninternionsaninterninternionsaninterface.2)

Toensureinitfunctionsareeffectiveandmaintainable:1)Minimizesideeffectsbyreturningvaluesinsteadofmodifyingglobalstate,2)Ensureidempotencytohandlemultiplecallssafely,and3)Breakdowncomplexinitializationintosmaller,focusedfunctionstoenhancemodularityandm

goisidealforbeginnersandsubableforforcloudnetworkservicesduetoitssimplicity,效率和concurrencyFeatures.1)installgromtheofficialwebsitealwebsiteandverifywith'.2)

開發者應遵循以下最佳實踐:1.謹慎管理goroutines以防止資源洩漏;2.使用通道進行同步,但避免過度使用;3.在並發程序中顯式處理錯誤;4.了解GOMAXPROCS以優化性能。這些實踐對於高效和穩健的軟件開發至關重要,因為它們確保了資源的有效管理、同步的正確實現、錯誤的適當處理以及性能的優化,從而提升軟件的效率和可維護性。


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

VSCode Windows 64位元 下載
微軟推出的免費、功能強大的一款IDE編輯器

SublimeText3 Linux新版
SublimeText3 Linux最新版

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

mPDF
mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),