首页 >后端开发 >Golang >Go 中的管道并发模式:综合视觉指南

Go 中的管道并发模式:综合视觉指南

DDD
DDD原创
2024-12-31 14:23:11486浏览

⚠️ 这个系列如何进行?

1。运行每个示例:不要只阅读代码。输入它,运行它,然后观察其行为。
2。实验和打破常规: 删除睡眠并看看会发生什么,更改通道缓冲区大小,修改 goroutine 计数。
打破东西会教你它们是如何工作的
3。关于行为的原因: 在运行修改后的代码之前,尝试预测结果。当您看到意外行为时,请停下来思考原因。挑战解释。
4。建立心理模型:每个可视化代表一个概念。尝试为修改后的代码绘制自己的图表。

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

在上一篇文章中,我们探讨了生成器并发模式,它是 Go 其他并发模式的构建块。您可以在这里阅读:

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

Go 中的生成器并发模式:视觉指南

Souvik Kar Mahapatra ・ 12 月 25 日

#去 #教程 #编程 #学习

现在,让我们看看这些原语如何组合起来形成解决现实世界问题的强大模式。

在这篇文章中,我们将介绍管道模式并尝试将它们可视化。因此,让我们做好准备,因为我们将亲手完成整个过程。

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

管道模式

管道就像工厂中的装配线,每个阶段对数据执行特定任务,并将结果传递到下一个阶段。

我们通过将 goroutine 与通道连接来构建管道,其中 每个 goroutine 代表一个阶段,用于接收数据、处理数据并将其发送到下一个阶段。

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

让我们实现一个简单的管道:

  1. 生成数字
  2. 将它们平方
  3. 打印结果
// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// Stage 3: Print numbers
func print(in <-chan int) {
    for n := range in {
        fmt.Printf("%d ", n)
    }
    fmt.Println()
}

func main() {
    // Connect the pipeline
    numbers := generate(2, 3, 4)    // Stage 1
    squares := square(numbers)       // Stage 2
    print(squares)                   // Stage 3
}

✏️快字节


chan int 这表示双向通道。
chan int 类型的通道可用于发送和接收值。

让我们继续想象一下上面的例子:

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

在这里您可以看到管道的每个构建块都是遵循生成器模式的 goroutine。这意味着只要数据在任何步骤准备好,管道中的下一步就可以开始处理它,这与顺序处理不同。

管道中的错误处理

核心原则应该是:

  1. 每个阶段都清楚地知道如何处理好的和坏的值
  2. 错误不会在管道中丢失
  3. 错误的值不会引起恐慌
  4. 错误消息包含有关出错原因的上下文
  5. 管道可以扩展更多阶段,并且它们都会一致地处理错误

让我们用一些正确的错误处理来更新我们的代码。

type Result struct {
    Value int
    Err   error
}

func generateWithError(nums ...int) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for _, n := range nums {
            if n < 0 {
                out <- Result{Err: fmt.Errorf("negative number: %d", n)}
                return
            }
            out <- Result{Value: n}
        }
    }()
    return out
}

func squareWithError(in <-chan Result) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for r := range in {
            if r.Err != nil {
                out <- r  // Forward the error
                continue
            }
            out <- Result{Value: r.Value * r.Value}
        }
    }()
    return out
}

func main() {
    // Using pipeline with error handling
    for result := range squareWithError(generateWithError(2, -3, 4)) {
        if result.Err != nil {
            fmt.Printf("Error: %v\n", result.Err)
            continue
        }
        fmt.Printf("Result: %d\n", result.Value)
    }
}

为什么使用管道模式?

让我们举个例子来更好地理解,我们有一个遵循管道模式的数据处理工作流程,如下所示。

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

  1. 管道中的每个阶段都是独立运行的,仅通过通道进行通信。这有几个好处:

?每个阶段都可以独立开发、测试和修改
?对一个阶段内部结构的更改不会影响其他阶段
?轻松添加新阶段或修改现有阶段
?明确的关注点分离

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

  1. 管道模式自然地支持并行/并发处理。一旦数据可用,每个阶段都可以同时处理不同的数据。

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

最好的部分是什么?我们可以运行每个阶段的多个实例(工作人员)以满足更多并发要求,如下所示:

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

??嘿,但这不是扇入和扇出并发模式吗?

宾果游戏!很好的收获就在那里。它确实是一种扇出、扇入模式,它是管道模式的一种特定类型。我们将在下一篇文章中详细介绍它,所以不用担心;)

现实世界用例

在管道中处理图像

// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// Stage 3: Print numbers
func print(in <-chan int) {
    for n := range in {
        fmt.Printf("%d ", n)
    }
    fmt.Println()
}

func main() {
    // Connect the pipeline
    numbers := generate(2, 3, 4)    // Stage 1
    squares := square(numbers)       // Stage 2
    print(squares)                   // Stage 3
}

或者像日志处理管道一样复杂的东西

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

管道扩展模式

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

水平缩放(扇出、扇入)

此模式非常适合 CPU 密集型操作,其中工作可以独立处理。管道将工作分配给多个工作人员,然后重新组合结果。这在以下情况下特别有效:

  1. 处理是CPU密集型的(数据转换、计算)
  2. 可以独立处理任务
  3. 您有多个可用的 CPU 核心

缓冲管道

此模式有助于管理管道阶段之间的速度不匹配。缓冲器起到减震器的作用,允许快速阶段向前工作,而不会被较慢阶段阻挡。这在以下情况下很有用:

  1. 不同阶段有不同的处理速度
  2. 您想要保持稳定的吞吐量
  3. 缓冲的内存使用是可以接受的
  4. 您需要处理突发处理

批量处理

此模式通过将多个项目分组为单个批次来优化 I/O 密集型操作。它不是一次处理一个项目,而是将它们分组并一起处理。这在以下情况下有效:

  1. 您正在使用外部系统(数据库、API)
  2. 网络往返费用昂贵
  3. 该操作每个请求都有大量固定开销
  4. 您需要优化吞吐量而不是延迟

这些模式中的每一个都可以根据需要进行组合。例如,您可以使用水平扩展的批处理,其中多个工作人员每个处理一批项目。 关键是了解您的瓶颈并选择适当的模式来解决它们


我们对生成器模式的深入研究到此结束了!接下来,我们将探索管道并发模式,我们将了解如何将生成器链接在一起以构建强大的数据处理流程。

如果您发现这篇文章有帮助,有任何疑问,或者想分享您自己的生成器经验 - 我很乐意在下面的评论中听到您的意见。您的见解和问题有助于使这些解释对每个人来说都更好。

如果您错过了 Golang 的 goroutine 和通道的视觉指南,请在此处查看:

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

理解和可视化 Golang 中的 Goroutine 和 Channel

Souvik Kar Mahapatra ・ 12 月 20 日

#去 #编程 #学习 #教程

请继续关注更多 Go 并发模式! ?

Pipeline Concurrency Pattern in Go: A Comprehensive Visual Guide

以上是Go 中的管道并发模式:综合视觉指南的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn