程式碼結構
#我們建立了一個通用的workerPool 套件,根據業務所需的並發性使用worker 來處理任務。一起來看下目錄結構:
workerpool ├── pool.go ├── task.go └── worker.go
workerpool 目錄在專案的根目錄下。 Task 是需要處理單一工作單元;Worker 是一個簡單的 worker 函數,用於執行任務;而 Pool 則用於建立、管理 workers。
先看下Task 程式碼:
// workerpool/task.go package workerpool import ( "fmt" ) type Task struct { Err error Data interface{} f func(interface{}) error } func NewTask(f func(interface{}) error, data interface{}) *Task { return &Task{f: f, Data: data} } func process(workerID int, task *Task) { fmt.Printf("Worker %d processes task %v\n", workerID, task.Data) task.Err = task.f(task.Data) }
Task 是一個簡單的結構體,保存處理任務所需要的一切數據。建立 task 時,傳遞了 Data 和待執行函數 f,process() 函數會處理任務。處理任務時,將 Data 作為參數傳遞給函數 f,並將執行結果保存在 Task.Err 裡。
我們來看看 Worker 是如何處理任務的:
// workerpool/worker.go package workerpool import ( "fmt" "sync" ) // Worker handles all the work type Worker struct { ID int taskChan chan *Task } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker { return &Worker{ ID: ID, taskChan: channel, } } // Start starts the worker func (wr *Worker) Start(wg *sync.WaitGroup) { fmt.Printf("Starting worker %d\n", wr.ID) wg.Add(1) go func() { defer wg.Done() for task := range wr.taskChan { process(wr.ID, task) } }() }
我們建立了一個小巧的 Worker 結構體,包含 worker ID 和 一個保存待處理任務的 channel。在 Start() 方法裡,使用 for range 從 taskChan 讀取任務並處理。可以想像的到,多個 worker 可以並發地執行任務。
我們透過實作Task 和Worker 來處理任務,但是好像還缺點什麼東西,誰負責產生這些worker 並將任務發送給它們?答案是:Worker Pool。
// workerpoo/pool.go package workerpool import ( "fmt" "sync" "time" ) // Pool is the worker pool type Pool struct { Tasks []*Task concurrency int collector chan *Task wg sync.WaitGroup } // NewPool initializes a new pool with the given tasks and // at the given concurrency. func NewPool(tasks []*Task, concurrency int) *Pool { return &Pool{ Tasks: tasks, concurrency: concurrency, collector: make(chan *Task, 1000), } } // Run runs all work within the pool and blocks until it's // finished. func (p *Pool) Run() { for i := 1; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) worker.Start(&p.wg) } for i := range p.Tasks { p.collector <- p.Tasks[i] } close(p.collector) p.wg.Wait() }
上面的程式碼,pool 保存了所有待處理的任務,並且產生與 concurrency 數量一致的 goroutine,用於並發地處理任務。 workers 之間共用快取 channel -- collector。
所以,當我們把這個工作池跑起來時,可以產生滿足所需數量的 worker,workers 之間共享 collector channel。接著,使用 for range 讀取 tasks,並將讀取到的 task 寫入 collector 裡。我們使用 sync.WaitGroup 來實現協程之間的同步。現在我們有了一個很好的解決方案,一起來測試下。
// main.go package main import ( "fmt" "time" "github.com/Joker666/goworkerpool/workerpool" ) func main() { var allTask []*workerpool.Task for i := 1; i <= 100; i++ { task := workerpool.NewTask(func(data interface{}) error { taskID := data.(int) time.Sleep(100 * time.Millisecond) fmt.Printf("Task %d processed\n", taskID) return nil }, i) allTask = append(allTask, task) } pool := workerpool.NewPool(allTask, 5) pool.Run() }
上面的程式碼,創建了 100 個任務並且使用 5 個並發處理這些任務。
输出如下:
Worker 3 processes task 98 Task 92 processed Worker 2 processes task 99 Task 98 processed Worker 5 processes task 100 Task 99 processed Task 100 processed Took ===============> 2.0056295s
处理 100 个任务花费了 2s,如何我们将并发数提高到 10,我们会看到处理完所有任务只需要大约 1s。
我们通过实现 workerPool 构建了一个健壮的解决方案,具有并发性、错误处理、数据处理等功能。这是个通用的包,不耦合具体的实现。我们可以使用它来解决一些大问题。
实际上,我们还可以进一步扩展上面的解决方案,以便 worker 可以在后台等待我们投递新的任务并处理。为此,代码需要做一些修改,Task 结构体保持不变,但是需要小改下 Worker,看下面代码:
// workerpool/worker.go // Worker handles all the work type Worker struct { ID int taskChan chan *Task quit chan bool } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker { return &Worker{ ID: ID, taskChan: channel, quit: make(chan bool), } } .... // StartBackground starts the worker in background waiting func (wr *Worker) StartBackground() { fmt.Printf("Starting worker %d\n", wr.ID) for { select { case task := <-wr.taskChan: process(wr.ID, task) case <-wr.quit: return } } } // Stop quits the worker func (wr *Worker) Stop() { fmt.Printf("Closing worker %d\n", wr.ID) go func() { wr.quit <- true }() }
Worker 结构体新加 quit channel,并且新加了两个方法。StartBackgorund() 在 for 循环里使用 select-case 从 taskChan 队列读取任务并处理,如果从 quit 读取到结束信号就立即返回。Stop() 方法负责往 quit 写入结束信号。
添加完这两个新的方法之后,我们来修改下 Pool:
// workerpool/pool.go type Pool struct { Tasks []*Task Workers []*Worker concurrency int collector chan *Task runBackground chan bool wg sync.WaitGroup } // AddTask adds a task to the pool func (p *Pool) AddTask(task *Task) { p.collector <- task } // RunBackground runs the pool in background func (p *Pool) RunBackground() { go func() { for { fmt.Print("⌛ Waiting for tasks to come in ...\n") time.Sleep(10 * time.Second) } }() for i := 1; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) p.Workers = append(p.Workers, worker) go worker.StartBackground() } for i := range p.Tasks { p.collector <- p.Tasks[i] } p.runBackground = make(chan bool) <-p.runBackground } // Stop stops background workers func (p *Pool) Stop() { for i := range p.Workers { p.Workers[i].Stop() } p.runBackground <- true }
Pool 结构体添加了两个成员:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于维持 pool 存活状态。
添加了三个新的方法,AddTask() 方法用于往 collector 添加任务;RunBackground() 方法衍生出一个无限运行的 goroutine,以便 pool 维持存活状态,因为 runBackground 信道是空,读取空的 channel 会阻塞,所以 pool 能维持运行状态。接着,在协程里面启动 worker;Stop() 方法用于停止 worker,并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。
我们来看下具体是如何工作的。
如果是在现实的业务场景中,pool 将会与 HTTP 服务器一块运行并消耗任务。我们通过 for 无限循环模拟这种这种场景,如果满足某一条件,pool 将会停止。
// main.go ... pool := workerpool.NewPool(allTask, 5) go func() { for { taskID := rand.Intn(100) + 20 if taskID%7 == 0 { pool.Stop() } time.Sleep(time.Duration(rand.Intn(5)) * time.Second) task := workerpool.NewTask(func(data interface{}) error { taskID := data.(int) time.Sleep(100 * time.Millisecond) fmt.Printf("Task %d processed\n", taskID) return nil }, taskID) pool.AddTask(task) } }() pool.RunBackground()
当执行上面的代码时,我们就会看到有随机的 task 被投递到后台运行的 workers,其中某一个 worker 会读取到任务并完成处理。当满足某一条件时,程序便会停止退出。
以上是Go語言的並發與WorkerPool - 第二部分的詳細內容。更多資訊請關注PHP中文網其他相關文章!