我是 python 開發人員,但應該使用 go 製作資料流管道。 與 python 或 java 相比,我找不到那麼多使用 go 的 apache beam 範例。
我有以下程式碼,其中具有使用者名稱和年齡的結構。任務是增加年齡,然後根據年齡進行過濾。我找到了增加年齡的方法,但卡在過濾部分。
package main import ( "context" "flag" "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { beam.registerfunction(incrementage) } type user struct { name string age int } func printrow(ctx context.context, list user) { fmt.println(list) } func incrementage(list user) user { list.age++ return list } func main() { flag.parse() beam.init() ctx := context.background() p := beam.newpipeline() s := p.root() var userlist = []user{ {"bob", 40}, {"adam", 50}, {"john", 35}, {"ben", 8}, } initial := beam.createlist(s, userlist) pc := beam.pardo(s, incrementage, initial) pc1 := beam.pardo(s, func(row user, emit func(user)) { emit(row) }, pc) beam.pardo0(s, printrow, pc1) if err := beamx.run(ctx, p); err != nil { log.exitf(ctx, "failed to execute job: %v", err) } }
我嘗試建立一個如下所示的函數,但這會傳回一個布林值而不是使用者物件。我知道我錯過了一些簡單但無法弄清楚的事情。
func filterage(list user) user { return list.age > 40 }
在 python 中,我可以寫如下所示的函數。
beam.Filter(lambda line: line["Age"] >= 40))
您需要在函數中新增一個發射器來發射使用者:
func filterAge(list user, emit func(user)) { if list.Age > 40 { emit(list) } }
如您目前程式碼所寫, 傳回 list.age > 40
list.age > 40
首先評估為 true(布林值),並且傳回該布林值。
以上是Go 中的 Apache Beam ParDo 濾鏡的詳細內容。更多資訊請關注PHP中文網其他相關文章!