1。运行每个示例:不要只阅读代码。输入它,运行它,然后观察其行为。⚠️ 这个系列如何进行?
2。实验和打破常规: 删除睡眠并看看会发生什么,更改通道缓冲区大小,修改 goroutine 计数。
打破东西会教你它们是如何工作的
3。关于行为的原因: 在运行修改后的代码之前,尝试预测结果。当您看到意外行为时,请停下来思考原因。挑战解释。
4。建立心理模型:每个可视化代表一个概念。尝试为修改后的代码绘制自己的图表。
在上一篇文章中,我们探讨了生成器并发模式,它是 Go 其他并发模式的构建块。您可以在这里阅读:
现在,让我们看看这些原语如何组合起来形成解决现实世界问题的强大模式。
在这篇文章中,我们将介绍管道模式并尝试将它们可视化。因此,让我们做好准备,因为我们将亲手完成整个过程。
管道就像工厂中的装配线,每个阶段对数据执行特定任务,并将结果传递到下一个阶段。
我们通过将 goroutine 与通道连接来构建管道,其中 每个 goroutine 代表一个阶段,用于接收数据、处理数据并将其发送到下一个阶段。
让我们实现一个简单的管道:
// Stage 1: Generate numbers func generate(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out } // Stage 2: Square numbers func square(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out } // Stage 3: Print numbers func print(in <-chan int) { for n := range in { fmt.Printf("%d ", n) } fmt.Println() } func main() { // Connect the pipeline numbers := generate(2, 3, 4) // Stage 1 squares := square(numbers) // Stage 2 print(squares) // Stage 3 }
✏️快字节
chan int 这表示双向通道。
chan int 类型的通道可用于发送和接收值。
让我们继续想象一下上面的例子:
在这里您可以看到管道的每个构建块都是遵循生成器模式的 goroutine。这意味着只要数据在任何步骤准备好,管道中的下一步就可以开始处理它,这与顺序处理不同。
核心原则应该是:
让我们用一些正确的错误处理来更新我们的代码。
type Result struct { Value int Err error } func generateWithError(nums ...int) <-chan Result { out := make(chan Result) go func() { defer close(out) for _, n := range nums { if n < 0 { out <- Result{Err: fmt.Errorf("negative number: %d", n)} return } out <- Result{Value: n} } }() return out } func squareWithError(in <-chan Result) <-chan Result { out := make(chan Result) go func() { defer close(out) for r := range in { if r.Err != nil { out <- r // Forward the error continue } out <- Result{Value: r.Value * r.Value} } }() return out } func main() { // Using pipeline with error handling for result := range squareWithError(generateWithError(2, -3, 4)) { if result.Err != nil { fmt.Printf("Error: %v\n", result.Err) continue } fmt.Printf("Result: %d\n", result.Value) } }
让我们举个例子来更好地理解,我们有一个遵循管道模式的数据处理工作流程,如下所示。
?每个阶段都可以独立开发、测试和修改
?对一个阶段内部结构的更改不会影响其他阶段
?轻松添加新阶段或修改现有阶段
?明确的关注点分离
最好的部分是什么?我们可以运行每个阶段的多个实例(工作人员)以满足更多并发要求,如下所示:
??嘿,但这不是扇入和扇出并发模式吗?
宾果游戏!很好的收获就在那里。它确实是一种扇出、扇入模式,它是管道模式的一种特定类型。我们将在下一篇文章中详细介绍它,所以不用担心;)
在管道中处理图像
// Stage 1: Generate numbers func generate(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out } // Stage 2: Square numbers func square(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out } // Stage 3: Print numbers func print(in <-chan int) { for n := range in { fmt.Printf("%d ", n) } fmt.Println() } func main() { // Connect the pipeline numbers := generate(2, 3, 4) // Stage 1 squares := square(numbers) // Stage 2 print(squares) // Stage 3 }
或者像日志处理管道一样复杂的东西
此模式非常适合 CPU 密集型操作,其中工作可以独立处理。管道将工作分配给多个工作人员,然后重新组合结果。这在以下情况下特别有效:
此模式有助于管理管道阶段之间的速度不匹配。缓冲器起到减震器的作用,允许快速阶段向前工作,而不会被较慢阶段阻挡。这在以下情况下很有用:
此模式通过将多个项目分组为单个批次来优化 I/O 密集型操作。它不是一次处理一个项目,而是将它们分组并一起处理。这在以下情况下有效:
这些模式中的每一个都可以根据需要进行组合。例如,您可以使用水平扩展的批处理,其中多个工作人员每个处理一批项目。 关键是了解您的瓶颈并选择适当的模式来解决它们。
我们对生成器模式的深入研究到此结束了!接下来,我们将探索管道并发模式,我们将了解如何将生成器链接在一起以构建强大的数据处理流程。
如果您发现这篇文章有帮助,有任何疑问,或者想分享您自己的生成器经验 - 我很乐意在下面的评论中听到您的意见。您的见解和问题有助于使这些解释对每个人来说都更好。
如果您错过了 Golang 的 goroutine 和通道的视觉指南,请在此处查看:
请继续关注更多 Go 并发模式! ?
以上是Go 中的管道并发模式:综合视觉指南的详细内容。更多信息请关注PHP中文网其他相关文章!