我是一名 python 开发人员,但应该使用 go 制作数据流管道。 与 python 或 java 相比,我找不到那么多使用 go 的 apache beam 示例。
我有以下代码,其中具有用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但卡在过滤部分。
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { beam.registerfunction(incrementage) } type user struct { name string age int } func printrow(ctx context.context, list user) { fmt.println(list) } func incrementage(list user) user { list.age++ return list } func main() { flag.parse() beam.init() ctx := context.background() p := beam.newpipeline() s := p.root() var userlist = []user{ {"bob", 40}, {"adam", 50}, {"john", 35}, {"ben", 8}, } initial := beam.createlist(s, userlist) pc := beam.pardo(s, incrementage, initial) pc1 := beam.pardo(s, func(row user, emit func(user)) { emit(row) }, pc) beam.pardo0(s, printrow, pc1) if err := beamx.run(ctx, p); err != nil { log.exitf(ctx, "failed to execute job: %v", err) } }
我尝试创建一个如下所示的函数,但这返回一个布尔值而不是用户对象。我知道我错过了一些简单但无法弄清楚的事情。
func filterage(list user) user { return list.age > 40 }
在 python 中,我可以编写如下所示的函数。
beam.Filter(lambda line: line["Age"] >= 40))
您需要在函数中添加一个发射器来发射用户:
func filterAge(list user, emit func(user)) { if list.Age > 40 { emit(list) } }
正如您当前代码中所写, 返回 list.age > 40
list.age > 40
首先评估为 true(布尔值),并且返回该布尔值。
以上是Go 中的 Apache Beam ParDo 过滤器的详细内容。更多信息请关注PHP中文网其他相关文章!