首页  >  文章  >  后端开发  >  Go 中的 Apache Beam ParDo 过滤器

Go 中的 Apache Beam ParDo 过滤器

王林
王林转载
2024-02-05 11:57:581024浏览

Go 中的 Apache Beam ParDo 过滤器

问题内容

我是一名 python 开发人员,但应该使用 go 制作数据流管道。 与 python 或 java 相比,我找不到那么多使用 go 的 apache beam 示例。

我有以下代码,其中具有用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但卡在过滤部分。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.registerfunction(incrementage)
}

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func incrementage(list user) user {
    list.age++
    return list
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 40},
        {"adam", 50},
        {"john", 35},
        {"ben", 8},
    }
    initial := beam.createlist(s, userlist)

    pc := beam.pardo(s, incrementage, initial)

    pc1 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.pardo0(s, printrow, pc1)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

我尝试创建一个如下所示的函数,但这返回一个布尔值而不是用户对象。我知道我错过了一些简单但无法弄清楚的事情。

func filterage(list user) user {
    return list.age > 40    
}

在 python 中,我可以编写如下所示的函数。

beam.Filter(lambda line: line["Age"] >= 40))

正确答案


您需要在函数中添加一个发射器来发射用户:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

正如您当前代码中所写, 返回 list.age > 40 list.age > 40 首先评估为 true(布尔值),并且返回该布尔值。

以上是Go 中的 Apache Beam ParDo 过滤器的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文转载于:stackoverflow.com。如有侵权,请联系admin@php.cn删除