>  기사  >  백엔드 개발  >  Go 언어의 동시성 및 WorkerPool - 2부

Go 언어의 동시성 및 WorkerPool - 2부

Go语言进阶学习
Go语言进阶学习앞으로
2023-07-21 10:47:451124검색

코드 구조

비즈니스에서 요구하는 동시성에 따라 작업자를 사용하여 작업을 처리하는 일반 WorkerPool 패키지를 만들었습니다. 디렉터리 구조를 살펴보겠습니다.

workerpool
├── pool.go
├── task.go
└── worker.go

workerpool 디렉터리는 프로젝트의 루트 디렉터리에 있습니다. 태스크는 처리해야 하는 단일 작업 단위입니다. 작업자는 작업을 수행하는 데 사용되는 간단한 작업자 기능이고 풀은 작업자를 생성하고 관리하는 데 사용됩니다.

implementation

먼저 태스크 코드를 살펴보세요.

// workerpool/task.go

package workerpool

import (
 "fmt"
)

type Task struct {
 Err  error
 Data interface{}
 f    func(interface{}) error
}

func NewTask(f func(interface{}) error, data interface{}) *Task {
 return &Task{f: f, Data: data}
}

func process(workerID int, task *Task) {
 fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
 task.Err = task.f(task.Data)
}

태스크는 태스크를 처리하는 데 필요한 모든 데이터를 저장하는 간단한 구조입니다. 태스크를 생성할 때 Data와 실행할 함수 f를 전달하고, process() 함수가 태스크를 처리하게 됩니다. 작업을 처리할 때 Data를 함수 f에 매개변수로 전달하고 실행 결과를 Task.Err에 저장합니다.

Worker가 작업을 처리하는 방법을 살펴보겠습니다.

// workerpool/worker.go

package workerpool

import (
 "fmt"
 "sync"
)

// Worker handles all the work
type Worker struct {
 ID       int
 taskChan chan *Task
}

// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
 return &Worker{
  ID:       ID,
  taskChan: channel,
 }
}

// Start starts the worker
func (wr *Worker) Start(wg *sync.WaitGroup) {
 fmt.Printf("Starting worker %d\n", wr.ID)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for task := range wr.taskChan {
   process(wr.ID, task)
  }
 }()
}

작업자 ID와 대기 중인 작업을 저장하는 채널을 포함하는 작은 Worker 구조를 만들었습니다. Start() 메서드에서 범위를 사용하여 taskChan에서 작업을 읽고 처리합니다. 상상할 수 있듯이 여러 작업자가 동시에 작업을 수행할 수 있습니다.

workerPool

Task와 Worker를 구현하여 태스크를 처리하는데 뭔가 빠진 것 같습니다. 이러한 워커를 생성하고 태스크를 전달하는 책임은 누구에게 있습니까? 대답은 작업자 풀입니다.

// workerpoo/pool.go

package workerpool

import (
 "fmt"
 "sync"
 "time"
)

// Pool is the worker pool
type Pool struct {
 Tasks   []*Task

 concurrency   int
 collector     chan *Task
 wg            sync.WaitGroup
}

// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
 return &Pool{
  Tasks:       tasks,
  concurrency: concurrency,
  collector:   make(chan *Task, 1000),
 }
}

// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
 for i := 1; i <= p.concurrency; i++ {
  worker := NewWorker(p.collector, i)
  worker.Start(&p.wg)
 }

 for i := range p.Tasks {
  p.collector <- p.Tasks[i]
 }
 close(p.collector)

 p.wg.Wait()
}

위 코드에서 풀은 보류 중인 모든 작업을 저장하고 작업의 동시 처리를 위해 동시성과 일치하는 여러 고루틴을 생성합니다. 공유 캐시 채널 - 작업자 간 수집기입니다.

그래서 이 작업 풀을 운영하면 필요한 수의 작업자를 생성할 수 있고 수집기 채널은 작업자 간에 공유됩니다. 다음으로 범위를 사용하여 작업을 읽고 읽기 작업을 수집기에 씁니다. 우리는 코루틴 간의 동기화를 달성하기 위해 sync.WaitGroup을 사용합니다. 이제 좋은 해결책이 있으므로 테스트해 보겠습니다.

// main.go

package main

import (
 "fmt"
 "time"

 "github.com/Joker666/goworkerpool/workerpool"
)

func main() {
 var allTask []*workerpool.Task
 for i := 1; i <= 100; i++ {
  task := workerpool.NewTask(func(data interface{}) error {
   taskID := data.(int)
   time.Sleep(100 * time.Millisecond)
   fmt.Printf("Task %d processed\n", taskID)
   return nil
  }, i)
  allTask = append(allTask, task)
 }

 pool := workerpool.NewPool(allTask, 5)
 pool.Run()
}

위 코드는 100개의 작업을 생성하고 5개의 동시성을 사용하여 이러한 작업을 처리합니다.

输出如下:

Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s

处理 100 个任务花费了 2s,如何我们将并发数提高到 10,我们会看到处理完所有任务只需要大约 1s。

我们通过实现 workerPool 构建了一个健壮的解决方案,具有并发性、错误处理、数据处理等功能。这是个通用的包,不耦合具体的实现。我们可以使用它来解决一些大问题。

进一步扩展:后台处理任务

实际上,我们还可以进一步扩展上面的解决方案,以便 worker 可以在后台等待我们投递新的任务并处理。为此,代码需要做一些修改,Task 结构体保持不变,但是需要小改下 Worker,看下面代码:

// workerpool/worker.go

// Worker handles all the work
type Worker struct {
 ID       int
 taskChan chan *Task
 quit     chan bool
}

// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
 return &Worker{
  ID:       ID,
  taskChan: channel,
  quit:     make(chan bool),
 }
}

....

// StartBackground starts the worker in background waiting
func (wr *Worker) StartBackground() {
 fmt.Printf("Starting worker %d\n", wr.ID)

 for {
  select {
  case task := <-wr.taskChan:
   process(wr.ID, task)
  case <-wr.quit:
   return
  }
 }
}

// Stop quits the worker
func (wr *Worker) Stop() {
 fmt.Printf("Closing worker %d\n", wr.ID)
 go func() {
  wr.quit <- true
 }()
}

Worker 结构体新加 quit channel,并且新加了两个方法。StartBackgorund() 在 for 循环里使用 select-case 从 taskChan 队列读取任务并处理,如果从 quit 读取到结束信号就立即返回。Stop() 方法负责往 quit 写入结束信号。

添加完这两个新的方法之后,我们来修改下 Pool:

// workerpool/pool.go

type Pool struct {
 Tasks   []*Task
 Workers []*Worker

 concurrency   int
 collector     chan *Task
 runBackground chan bool
 wg            sync.WaitGroup
}

// AddTask adds a task to the pool
func (p *Pool) AddTask(task *Task) {
 p.collector <- task
}

// RunBackground runs the pool in background
func (p *Pool) RunBackground() {
 go func() {
  for {
   fmt.Print("⌛ Waiting for tasks to come in ...\n")
   time.Sleep(10 * time.Second)
  }
 }()

 for i := 1; i <= p.concurrency; i++ {
  worker := NewWorker(p.collector, i)
  p.Workers = append(p.Workers, worker)
  go worker.StartBackground()
 }

 for i := range p.Tasks {
  p.collector <- p.Tasks[i]
 }

 p.runBackground = make(chan bool)
 <-p.runBackground
}

// Stop stops background workers
func (p *Pool) Stop() {
 for i := range p.Workers {
  p.Workers[i].Stop()
 }
 p.runBackground <- true
}

Pool 结构体添加了两个成员:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于维持 pool 存活状态。

添加了三个新的方法,AddTask() 方法用于往 collector 添加任务;RunBackground() 方法衍生出一个无限运行的 goroutine,以便 pool 维持存活状态,因为 runBackground 信道是空,读取空的 channel 会阻塞,所以 pool 能维持运行状态。接着,在协程里面启动 worker;Stop() 方法用于停止 worker,并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。

我们来看下具体是如何工作的。

如果是在现实的业务场景中,pool 将会与 HTTP 服务器一块运行并消耗任务。我们通过 for 无限循环模拟这种这种场景,如果满足某一条件,pool 将会停止。

// main.go

...

pool := workerpool.NewPool(allTask, 5)
go func() {
 for {
  taskID := rand.Intn(100) + 20

  if taskID%7 == 0 {
   pool.Stop()
  }

  time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
  task := workerpool.NewTask(func(data interface{}) error {
   taskID := data.(int)
   time.Sleep(100 * time.Millisecond)
   fmt.Printf("Task %d processed\n", taskID)
   return nil
  }, taskID)
  pool.AddTask(task)
 }
}()
pool.RunBackground()

当执行上面的代码时,我们就会看到有随机的 task 被投递到后台运行的 workers,其中某一个 worker 会读取到任务并完成处理。当满足某一条件时,程序便会停止退出。

위 내용은 Go 언어의 동시성 및 WorkerPool - 2부의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 Go语言进阶学习에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제