搜索
首页后端开发GolangGo语言的并发与WorkerPool - 第一部分


现代编程语言中,并发已经成为必不可少的特性。现在绝大多数编程语言都有一些方法实现并发。

其中一些实现方式非常强大,能将负载转移到不同的系统线程,比如 Java 等;一些则在同一线程上模拟这种行为,比如 Ruby 等。

Golang 的并发模型非常强大,称为 CSP(通信顺序进程),它将一个问题分解成更小的顺序进程,然后调度这些进程的实例(称为 Goroutine)。这些进程通过 channel 传递信息实现通信。

本文,我们将探讨如何利用 golang 的并发性,以及如何在 workerPool 使用。系列文章的第二篇,我们将探讨如何构建一个强大的并发解决方案。

一个简单的例子

假设我们需要调用一个外部 API 接口,整个过程需要花费 100ms。如果我们需要同步地调用该接口 1000 次,则需要花费 100s。

//// model/data.go

package model

type SimpleData struct {
 ID int
}

//// basic/basic.go

package basic

import (
 "fmt"
 "github.com/Joker666/goworkerpool/model"
 "time"
)

func Work(allData []model.SimpleData) {
 start := time.Now()
 for i, _ := range allData {
  Process(allData[i])
 }
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func Process(data model.SimpleData) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 fmt.Printf("Finish processing %d\n", data.ID)
}

//// main.go

package main

import (
 "fmt"
 "github.com/Joker666/goworkerpool/basic"
 "github.com/Joker666/goworkerpool/model"
 "github.com/Joker666/goworkerpool/worker"
)

func main() {
 // Prepare the data
 var allData []model.SimpleData
 for i := 0; i < 1000; i++ {
  data := model.SimpleData{ ID: i }
  allData = append(allData, data)
 }
 fmt.Printf("Start processing all work \n")

 // Process
 basic.Work(allData)
}
Start processing all work
Took ===============> 1m40.226679665s

上面的代码创建了 model 包,包里包含一个结构体,这个结构体只有一个 int 类型的成员。我们同步地处理 data,这显然不是最佳方案,因为可以并发处理这些任务。我们换一种方案,使用 goroutine 和 channel 来处理。

异步

//// worker/notPooled.go

func NotPooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup

 dataCh := make(chan model.SimpleData, 100)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for data := range dataCh {
   wg.Add(1)
   go func(data model.SimpleData) {
    defer wg.Done()
    basic.Process(data)
   }(data)
  }
 }()

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.NotPooledWork(allData)
Start processing all work
Took ===============> 101.191534ms

上面的代码,我们创建了容量 100 的缓存 channel,并通过 NoPooledWork() 将数据 push 到 channel 里。channel 长度满 100 之后,我们是无法再向其中添加元素直到有元素被读取走。使用 for range 读取 channel,并生成 goroutine 处理。这里我们没有限制生成 goroutine 的数量,这可以尽可能多地处理任务。从理论上来讲,在给定所需资源的情况下,可以处理尽可能多的数据。执行代码,完成 1000 个任务只花费了 100ms。很疯狂吧!不全是,接着往下看。

问题

除非我们拥有地球上所有的资源,否则在特定时间内能够分配的资源是有限的。一个 goroutine 占用的最小内存是 2k,但也能达到 1G。上述并发执行所有任务的解决方案中,假设有一百万个任务,就会很快耗尽机器的内存和 CPU。我们要么升级机器的配置,要么就寻找其他更好的解决方案。

计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。

解决方案:Worker Pool

我们通过实现 worker pool 来修复之前遇到的问题。

//// worker/pooled.go

func PooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    basic.Process(data)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.PooledWork(allData)
Start processing all work
Took ===============> 1.002972449s

上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。

Go 语言的 channel 可以当作队列使用。

这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。

处理错误

我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。

//// worker/pooledError.go

func PooledWorkError(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)
 errors := make(chan error, 1000)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    process(data, errors)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for {
   select {
   case err := <-errors:
    fmt.Println("finished with error:", err.Error())
   case <-time.After(time.Second * 1):
    fmt.Println("Timeout: errors finished")
    return
   }
  }
 }()

 defer close(errors)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func process(data model.SimpleData, errors chan<- error) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 if data.ID % 29 == 0 {
  errors <- fmt.Errorf("error on job %v", data.ID)
 } else {
  fmt.Printf("Finish processing %d\n", data.ID)
 }
}

//// main.go

// Process
worker.PooledWorkError(allData)

我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。

比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,

我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。

总结

Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!

以上是Go语言的并发与WorkerPool - 第一部分的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:Go语言进阶学习。如有侵权,请联系admin@php.cn删除
使用GO开发时的安全考虑使用GO开发时的安全考虑Apr 27, 2025 am 12:18 AM

Gooffersrobustfeaturesforsecurecoding,butdevelopersmustimplementsecuritybestpracticeseffectively.1)UseGo'scryptopackageforsecuredatahandling.2)Manageconcurrencywithsynchronizationprimitivestopreventraceconditions.3)SanitizeexternalinputstoavoidSQLinj

了解GO的错误接口了解GO的错误接口Apr 27, 2025 am 12:16 AM

Go的错误接口定义为typeerrorinterface{Error()string},允许任何实现Error()方法的类型被视为错误。使用步骤如下:1.基本检查和记录错误,例如iferr!=nil{log.Printf("Anerroroccurred:%v",err)return}。2.创建自定义错误类型以提供更多信息,如typeMyErrorstruct{MsgstringDetailstring}。3.使用错误包装(自Go1.13起)来添加上下文而不丢失原始错误信息,

并发程序中的错误处理并发程序中的错误处理Apr 27, 2025 am 12:13 AM

对效率的Handleerrorsinconcurrentgopragrs,UsechannelstocommunicateErrors,EmparterRorwatchers,InsterTimeouts,UsebufferedChannels和Provideclearrormessages.1)USEchannelelStopassErstopassErrorsErtopassErrorsErrorsFromGoroutInestotheStothemainfunction.2)

您如何在GO中实现接口?您如何在GO中实现接口?Apr 27, 2025 am 12:09 AM

在Go语言中,接口的实现是通过隐式的方式进行的。1)隐式实现:类型只要包含接口定义的所有方法,就自动满足该接口。2)空接口:interface{}类型所有类型都实现,适度使用可避免类型安全问题。3)接口隔离:设计小而专注的接口,提高代码的可维护性和重用性。4)测试:接口有助于通过模拟依赖进行单元测试。5)错误处理:通过接口可以统一处理错误。

将GO接口与其他语言的接口进行比较(例如Java,C#)将GO接口与其他语言的接口进行比较(例如Java,C#)Apr 27, 2025 am 12:06 AM

go'sinterfacesareimpliclyimplysed,与Javaandc#wheRequireexplitiCimplation.1)Ingo,AnyTypewithTheRequiredMethodSautSautsautautapitymethodimimplementalyimimplementsaninternItherninternionterface,callingingSimplicity andficityity.2)

初始功能和副作用:平衡初始化与可维护性初始功能和副作用:平衡初始化与可维护性Apr 26, 2025 am 12:23 AM

Toensureinitfunctionsareeffectiveandmaintainable:1)Minimizesideeffectsbyreturningvaluesinsteadofmodifyingglobalstate,2)Ensureidempotencytohandlemultiplecallssafely,and3)Breakdowncomplexinitializationintosmaller,focusedfunctionstoenhancemodularityandm

开始GO:初学者指南开始GO:初学者指南Apr 26, 2025 am 12:21 AM

goisidealforbeginnersandsubableforforcloudnetworkservicesduetoitssimplicity,效率和concurrencyFeatures.1)installgromtheofficialwebsitealwebsiteandverifywith'.2)

进行并发模式:开发人员的最佳实践进行并发模式:开发人员的最佳实践Apr 26, 2025 am 12:20 AM

开发者应遵循以下最佳实践:1.谨慎管理goroutines以防止资源泄漏;2.使用通道进行同步,但避免过度使用;3.在并发程序中显式处理错误;4.了解GOMAXPROCS以优化性能。这些实践对于高效和稳健的软件开发至关重要,因为它们确保了资源的有效管理、同步的正确实现、错误的适当处理以及性能的优化,从而提升软件的效率和可维护性。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

安全考试浏览器

安全考试浏览器

Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

VSCode Windows 64位 下载

VSCode Windows 64位 下载

微软推出的免费、功能强大的一款IDE编辑器

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

SublimeText3 英文版

SublimeText3 英文版

推荐:为Win版本,支持代码提示!