现代编程语言中,并发已经成为必不可少的特性。现在绝大多数编程语言都有一些方法实现并发。
其中一些实现方式非常强大,能将负载转移到不同的系统线程,比如 Java 等;一些则在同一线程上模拟这种行为,比如 Ruby 等。
Golang 的并发模型非常强大,称为 CSP(通信顺序进程),它将一个问题分解成更小的顺序进程,然后调度这些进程的实例(称为 Goroutine)。这些进程通过 channel 传递信息实现通信。
本文,我们将探讨如何利用 golang 的并发性,以及如何在 workerPool 使用。系列文章的第二篇,我们将探讨如何构建一个强大的并发解决方案。
一个简单的例子
假设我们需要调用一个外部 API 接口,整个过程需要花费 100ms。如果我们需要同步地调用该接口 1000 次,则需要花费 100s。
//// model/data.go package model type SimpleData struct { ID int } //// basic/basic.go package basic import ( "fmt" "github.com/Joker666/goworkerpool/model" "time" ) func Work(allData []model.SimpleData) { start := time.Now() for i, _ := range allData { Process(allData[i]) } elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func Process(data model.SimpleData) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) fmt.Printf("Finish processing %d\n", data.ID) } //// main.go package main import ( "fmt" "github.com/Joker666/goworkerpool/basic" "github.com/Joker666/goworkerpool/model" "github.com/Joker666/goworkerpool/worker" ) func main() { // Prepare the data var allData []model.SimpleData for i := 0; i < 1000; i++ { data := model.SimpleData{ ID: i } allData = append(allData, data) } fmt.Printf("Start processing all work \n") // Process basic.Work(allData) }
Start processing all work Took ===============> 1m40.226679665s
上面的代码创建了 model 包,包里包含一个结构体,这个结构体只有一个 int 类型的成员。我们同步地处理 data,这显然不是最佳方案,因为可以并发处理这些任务。我们换一种方案,使用 goroutine 和 channel 来处理。
异步
//// worker/notPooled.go func NotPooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup dataCh := make(chan model.SimpleData, 100) wg.Add(1) go func() { defer wg.Done() for data := range dataCh { wg.Add(1) go func(data model.SimpleData) { defer wg.Done() basic.Process(data) }(data) } }() for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.NotPooledWork(allData)
Start processing all work Took ===============> 101.191534ms
上面的代码,我们创建了容量 100 的缓存 channel,并通过 NoPooledWork() 将数据 push 到 channel 里。channel 长度满 100 之后,我们是无法再向其中添加元素直到有元素被读取走。使用 for range 读取 channel,并生成 goroutine 处理。这里我们没有限制生成 goroutine 的数量,这可以尽可能多地处理任务。从理论上来讲,在给定所需资源的情况下,可以处理尽可能多的数据。执行代码,完成 1000 个任务只花费了 100ms。很疯狂吧!不全是,接着往下看。
问题
除非我们拥有地球上所有的资源,否则在特定时间内能够分配的资源是有限的。一个 goroutine 占用的最小内存是 2k,但也能达到 1G。上述并发执行所有任务的解决方案中,假设有一百万个任务,就会很快耗尽机器的内存和 CPU。我们要么升级机器的配置,要么就寻找其他更好的解决方案。
计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。
解决方案:Worker Pool
我们通过实现 worker pool 来修复之前遇到的问题。
//// worker/pooled.go func PooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { basic.Process(data) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.PooledWork(allData)
Start processing all work Took ===============> 1.002972449s
上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。
Go 语言的 channel 可以当作队列使用。
这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。
处理错误
我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。
//// worker/pooledError.go func PooledWorkError(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) errors := make(chan error, 1000) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { process(data, errors) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Add(1) go func() { defer wg.Done() for { select { case err := <-errors: fmt.Println("finished with error:", err.Error()) case <-time.After(time.Second * 1): fmt.Println("Timeout: errors finished") return } } }() defer close(errors) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func process(data model.SimpleData, errors chan<- error) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) if data.ID % 29 == 0 { errors <- fmt.Errorf("error on job %v", data.ID) } else { fmt.Printf("Finish processing %d\n", data.ID) } } //// main.go // Process worker.PooledWorkError(allData)
我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。
比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,
我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。
总结
Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!
以上是Go语言的并发与WorkerPool - 第一部分的详细内容。更多信息请关注PHP中文网其他相关文章!

Gooffersrobustfeaturesforsecurecoding,butdevelopersmustimplementsecuritybestpracticeseffectively.1)UseGo'scryptopackageforsecuredatahandling.2)Manageconcurrencywithsynchronizationprimitivestopreventraceconditions.3)SanitizeexternalinputstoavoidSQLinj

Go的错误接口定义为typeerrorinterface{Error()string},允许任何实现Error()方法的类型被视为错误。使用步骤如下:1.基本检查和记录错误,例如iferr!=nil{log.Printf("Anerroroccurred:%v",err)return}。2.创建自定义错误类型以提供更多信息,如typeMyErrorstruct{MsgstringDetailstring}。3.使用错误包装(自Go1.13起)来添加上下文而不丢失原始错误信息,

对效率的Handleerrorsinconcurrentgopragrs,UsechannelstocommunicateErrors,EmparterRorwatchers,InsterTimeouts,UsebufferedChannels和Provideclearrormessages.1)USEchannelelStopassErstopassErrorsErtopassErrorsErrorsFromGoroutInestotheStothemainfunction.2)

在Go语言中,接口的实现是通过隐式的方式进行的。1)隐式实现:类型只要包含接口定义的所有方法,就自动满足该接口。2)空接口:interface{}类型所有类型都实现,适度使用可避免类型安全问题。3)接口隔离:设计小而专注的接口,提高代码的可维护性和重用性。4)测试:接口有助于通过模拟依赖进行单元测试。5)错误处理:通过接口可以统一处理错误。

go'sinterfacesareimpliclyimplysed,与Javaandc#wheRequireexplitiCimplation.1)Ingo,AnyTypewithTheRequiredMethodSautSautsautautapitymethodimimplementalyimimplementsaninternItherninternionterface,callingingSimplicity andficityity.2)

Toensureinitfunctionsareeffectiveandmaintainable:1)Minimizesideeffectsbyreturningvaluesinsteadofmodifyingglobalstate,2)Ensureidempotencytohandlemultiplecallssafely,and3)Breakdowncomplexinitializationintosmaller,focusedfunctionstoenhancemodularityandm

goisidealforbeginnersandsubableforforcloudnetworkservicesduetoitssimplicity,效率和concurrencyFeatures.1)installgromtheofficialwebsitealwebsiteandverifywith'.2)

开发者应遵循以下最佳实践:1.谨慎管理goroutines以防止资源泄漏;2.使用通道进行同步,但避免过度使用;3.在并发程序中显式处理错误;4.了解GOMAXPROCS以优化性能。这些实践对于高效和稳健的软件开发至关重要,因为它们确保了资源的有效管理、同步的正确实现、错误的适当处理以及性能的优化,从而提升软件的效率和可维护性。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

安全考试浏览器
Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

SublimeText3汉化版
中文版,非常好用

VSCode Windows 64位 下载
微软推出的免费、功能强大的一款IDE编辑器

Atom编辑器mac版下载
最流行的的开源编辑器

SublimeText3 英文版
推荐:为Win版本,支持代码提示!