并发使我们能够独立处理多个任务。 Goroutine 是一种独立处理多个任务的简单方法。在这篇文章中,我们逐步增强了一个 http 处理程序,该处理程序接受文件,并利用通道和同步包探索 Go 中的各种并发模式。
在进入并发模式之前,让我们先做好准备。想象一下,我们有一个 HTTP 处理程序,它通过表单接受多个文件并以某种方式处理这些文件。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
在上面的示例中,我们从表单接收文件并按顺序处理它们。如果上传 10 个文件,则需要 1 秒才能完成该过程并向客户端发送响应。
当处理许多文件时,这可能会成为瓶颈,但是通过 Go 的并发支持,我们可以轻松解决这个问题。
为了解决这个问题,我们可以并发处理文件。要生成一个新的 goroutine,我们可以在函数调用前加上 go 关键字,例如去处理文件(f)。然而,由于 goroutine 是非阻塞的,处理程序可能会在进程完成之前返回,从而导致文件可能未处理或返回不正确的状态。要等待所有文件的处理,我们可以使用sync.WaitGroup。
WaitGroup 等待多个 goroutine 完成,对于我们生成的每个 goroutine,我们还应该增加 WaitGroup 中的计数器,这可以通过 Add 函数来完成。当 goroutine 完成时,应该调用 Done,以便计数器减一。在从函数返回之前,应该调用 Wait,该函数会阻塞,直到 WaitGroup 的计数器为 0。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
现在,对于每个上传的文件,都会生成一个新的 goroutine,这可能会压垮系统。一种解决方案是限制生成的 goroutine 的数量。
信号量只是一个变量,我们可以用它来控制多个线程(或者在本例中为 goroutine)对公共资源的访问。
在 Go 中,我们可以利用缓冲通道来实现信号量。
在进入实现之前,我们先来看看什么是通道以及缓冲通道和非缓冲通道之间的区别。
通道是一个管道,我们可以通过它发送和接收数据,以便在 go 例程之间安全地通信。
通道必须使用 make 函数创建。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
通道有一个特殊的运算符
让操作员指向通道 ch
该动画形象地展示了生产者通过无缓冲通道发送值 1 以及消费者从该通道读取数据的情况。
如果生产者发送事件的速度比消费者处理的速度快,那么我们可以选择利用缓冲通道来排队多个消息,而不会阻塞生产者,直到缓冲区已满。同时,消费者可以按照自己的节奏处理消息。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
在此示例中,生产者最多可以发送两个项目而不会阻塞。当缓冲区达到容量时,生产者将阻塞,直到消费者处理了至少一条消息。
回到最初的问题,我们要限制同时处理文件的 goroutine 数量。为此,我们可以利用缓冲通道。
ch := make(chan int)
在此示例中,我们添加了一个容量为 5 的缓冲通道,这使我们能够同时处理 5 个文件并限制系统压力。
但是如果并非所有文件都相等怎么办?我们可以可靠地预测不同的文件类型或文件大小需要更多的资源来处理。在这种情况下,我们可以使用加权信号量。
简单地说,使用加权信号量,我们可以为单个任务分配更多资源。 Go 已经在扩展同步包中提供了加权信号量的实现。
ch := make(chan int, 2)
在此版本中,我们创建了一个具有 5 个槽的加权信号量,如果仅上传图像,例如进程会同时处理 5 个图像,但是如果上传 PDF,则会获取 2 个槽,这将减少可处理的文件量同时。
我们探索了 Go 中的一些并发模式,利用sync.WaitGroup 和信号量来控制并发任务的数量。然而,还有更多可用的工具,我们可以利用通道来创建工作池、添加超时或使用扇入/扇出模式。
此外,错误处理是一个重要方面,但为了简单起见,大多数情况下都忽略了这一点。
处理错误的一种方法是利用通道来聚合错误并在所有 goroutine 完成后处理它们。
Go 还提供了 errgroup.Group ,它与sync.WaitGroups 相关,但添加了对返回错误的任务的处理。
该包可以在扩展同步包中找到。
以上是Goroutines 和 Channels:Go 中的并发模式的详细内容。更多信息请关注PHP中文网其他相关文章!