>  기사  >  백엔드 개발  >  Go 언어의 동시성 및 WorkerPool - 1부

Go 언어의 동시성 및 WorkerPool - 1부

Go语言进阶学习
Go语言进阶学习앞으로
2023-07-25 14:35:02569검색

동시성은 현대 프로그래밍 언어에서 필수적인 기능이 되었습니다. 대부분의 프로그래밍 언어에는 이제 동시성을 달성할 수 있는 몇 가지 방법이 있습니다.

이러한 구현 중 일부는 매우 강력하며 Java 등과 같은 다른 시스템 스레드로 로드를 이동할 수 있습니다. 일부는 Ruby 등과 같은 동일한 스레드에서 이 동작을 시뮬레이션합니다.

Golang의 동시성 모델은 CSP(Communicating Sequential Process)라고 불리는 매우 강력합니다. 이 모델은 문제를 더 작은 순차 프로세스로 나눈 다음 이러한 프로세스의 인스턴스(고루틴이라고 함)를 예약합니다. 이러한 프로세스는 채널을 통해 정보를 전달하여 통신합니다.

이 글에서는 golang의 동시성을 활용하는 방법과 이를 WorkerPool에서 사용하는 방법을 살펴보겠습니다. 시리즈의 두 번째 기사에서는 강력한 동시성 솔루션을 구축하는 방법을 살펴보겠습니다.

간단한 예

외부 API 인터페이스를 호출해야 하고 전체 프로세스에 100ms가 걸린다고 가정해 보겠습니다. 이 인터페이스를 동기적으로 1000번 호출해야 한다면 100초가 걸립니다.

//// model/data.go

package model

type SimpleData struct {
 ID int
}

//// basic/basic.go

package basic

import (
 "fmt"
 "github.com/Joker666/goworkerpool/model"
 "time"
)

func Work(allData []model.SimpleData) {
 start := time.Now()
 for i, _ := range allData {
  Process(allData[i])
 }
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func Process(data model.SimpleData) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 fmt.Printf("Finish processing %d\n", data.ID)
}

//// main.go

package main

import (
 "fmt"
 "github.com/Joker666/goworkerpool/basic"
 "github.com/Joker666/goworkerpool/model"
 "github.com/Joker666/goworkerpool/worker"
)

func main() {
 // Prepare the data
 var allData []model.SimpleData
 for i := 0; i < 1000; i++ {
  data := model.SimpleData{ ID: i }
  allData = append(allData, data)
 }
 fmt.Printf("Start processing all work \n")

 // Process
 basic.Work(allData)
}
Start processing all work
Took ===============> 1m40.226679665s

위 코드는 int 유형의 멤버가 하나만 있는 구조를 포함하는 모델 패키지를 생성합니다. 우리는 데이터를 동기식으로 처리하는데, 이러한 작업이 동시에 처리될 수 있기 때문에 이는 분명히 최적이 아닙니다. 솔루션을 변경하고 고루틴과 채널을 사용하여 처리해 보겠습니다.

Asynchronous

//// worker/notPooled.go

func NotPooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup

 dataCh := make(chan model.SimpleData, 100)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for data := range dataCh {
   wg.Add(1)
   go func(data model.SimpleData) {
    defer wg.Done()
    basic.Process(data)
   }(data)
  }
 }()

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.NotPooledWork(allData)
Start processing all work
Took ===============> 101.191534ms

위 코드에서는 용량이 100인 캐시 채널을 생성하고 NoPooledWork()를 통해 해당 채널에 데이터를 푸시합니다. 채널 길이가 100에 도달한 후에는 요소를 읽을 때까지 요소를 추가할 수 없습니다. 범위를 사용하여 채널을 읽고 고루틴 처리를 생성합니다. 여기서는 가능한 한 많은 작업을 처리할 수 있는 생성된 고루틴의 수에 제한이 없습니다. 이론적으로는 필요한 리소스가 주어지면 최대한 많은 데이터를 처리할 수 있습니다. 코드를 실행하면 1000개의 작업을 완료하는 데 100ms밖에 걸리지 않았습니다. 미쳤어! 전부는 아닙니다. 계속 읽어보세요.

문제

지구상의 모든 자원이 없다면 주어진 시간에 할당할 수 있는 자원은 한계가 있습니다. 고루틴이 차지하는 최소 메모리는 2k이지만 1G에 도달할 수도 있습니다. 백만 개의 작업을 가정하여 모든 작업을 동시에 실행하는 위의 솔루션은 시스템의 메모리와 CPU를 빠르게 소모합니다. 기계 구성을 업그레이드하거나 다른 더 나은 솔루션을 찾아야 합니다.

计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。

解决方案:Worker Pool

我们通过实现 worker pool 来修复之前遇到的问题。

//// worker/pooled.go

func PooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    basic.Process(data)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.PooledWork(allData)
Start processing all work
Took ===============> 1.002972449s

上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。

Go 语言的 channel 可以当作队列使用。

这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。

处理错误

我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。

//// worker/pooledError.go

func PooledWorkError(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)
 errors := make(chan error, 1000)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    process(data, errors)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for {
   select {
   case err := <-errors:
    fmt.Println("finished with error:", err.Error())
   case <-time.After(time.Second * 1):
    fmt.Println("Timeout: errors finished")
    return
   }
  }
 }()

 defer close(errors)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func process(data model.SimpleData, errors chan<- error) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 if data.ID % 29 == 0 {
  errors <- fmt.Errorf("error on job %v", data.ID)
 } else {
  fmt.Printf("Finish processing %d\n", data.ID)
 }
}

//// main.go

// Process
worker.PooledWorkError(allData)

我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。

比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,

我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。

总结

Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!

위 내용은 Go 언어의 동시성 및 WorkerPool - 1부의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 Go语言进阶学习에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제