并发使我们能够独立处理多个任务。 Goroutine 是一种独立处理多个任务的简单方法。在这篇文章中,我们逐步增强了一个 http 处理程序,该处理程序接受文件,并利用通道和同步包探索 Go 中的各种并发模式。
在进入并发模式之前,让我们先做好准备。想象一下,我们有一个 HTTP 处理程序,它通过表单接受多个文件并以某种方式处理这些文件。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
在上面的示例中,我们从表单接收文件并按顺序处理它们。如果上传 10 个文件,则需要 1 秒才能完成该过程并向客户端发送响应。
当处理许多文件时,这可能会成为瓶颈,但是通过 Go 的并发支持,我们可以轻松解决这个问题。
为了解决这个问题,我们可以并发处理文件。要生成一个新的 goroutine,我们可以在函数调用前加上 go 关键字,例如去处理文件(f)。然而,由于 goroutine 是非阻塞的,处理程序可能会在进程完成之前返回,从而导致文件可能未处理或返回不正确的状态。要等待所有文件的处理,我们可以使用sync.WaitGroup。
WaitGroup 等待多个 goroutine 完成,对于我们生成的每个 goroutine,我们还应该增加 WaitGroup 中的计数器,这可以通过 Add 函数来完成。当 goroutine 完成时,应该调用 Done,以便计数器减一。在从函数返回之前,应该调用 Wait,该函数会阻塞,直到 WaitGroup 的计数器为 0。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
现在,对于每个上传的文件,都会生成一个新的 goroutine,这可能会压垮系统。一种解决方案是限制生成的 goroutine 的数量。
信号量只是一个变量,我们可以用它来控制多个线程(或者在本例中为 goroutine)对公共资源的访问。
在 Go 中,我们可以利用缓冲通道来实现信号量。
通道是一个管道,我们可以通过它发送和接收数据,以便在 go 例程之间安全地通信。
通道必须使用 make 函数创建。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
让操作员指向通道 ch
该动画形象地展示了生产者通过无缓冲通道发送值 1 以及消费者从该通道读取数据的情况。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
回到最初的问题,我们要限制同时处理文件的 goroutine 数量。为此,我们可以利用缓冲通道。
ch := make(chan int)
在此示例中,我们添加了一个容量为 5 的缓冲通道,这使我们能够同时处理 5 个文件并限制系统压力。
简单地说,使用加权信号量,我们可以为单个任务分配更多资源。 Go 已经在扩展同步包中提供了加权信号量的实现。
ch := make(chan int, 2)
在此版本中,我们创建了一个具有 5 个槽的加权信号量,如果仅上传图像,例如进程会同时处理 5 个图像,但是如果上传 PDF,则会获取 2 个槽,这将减少可处理的文件量同时。
我们探索了 Go 中的一些并发模式,利用sync.WaitGroup 和信号量来控制并发任务的数量。然而,还有更多可用的工具,我们可以利用通道来创建工作池、添加超时或使用扇入/扇出模式。
处理错误的一种方法是利用通道来聚合错误并在所有 goroutine 完成后处理它们。
Go 还提供了 errgroup.Group ,它与sync.WaitGroups 相关,但添加了对返回错误的任务的处理。
以上是Goroutines 和 Channels:Go 中的并发模式的详细内容。更多信息请关注PHP中文网其他相关文章!