首頁  >  文章  >  後端開發  >  Go 中的 Apache Beam ParDo 濾鏡

Go 中的 Apache Beam ParDo 濾鏡

王林
王林轉載
2024-02-05 11:57:58985瀏覽

Go 中的 Apache Beam ParDo 过滤器

問題內容

我是 python 開發人員,但應該使用 go 製作資料流管道。 與 python 或 java 相比,我找不到那麼多使用 go 的 apache beam 範例。

我有以下程式碼,其中具有使用者名稱和年齡的結構。任務是增加年齡,然後根據年齡進行過濾。我找到了增加年齡的方法,但卡在過濾部分。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.registerfunction(incrementage)
}

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func incrementage(list user) user {
    list.age++
    return list
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 40},
        {"adam", 50},
        {"john", 35},
        {"ben", 8},
    }
    initial := beam.createlist(s, userlist)

    pc := beam.pardo(s, incrementage, initial)

    pc1 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.pardo0(s, printrow, pc1)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

我嘗試建立一個如下所示的函數,但這會傳回一個布林值而不是使用者物件。我知道我錯過了一些簡單但無法弄清楚的事情。

func filterage(list user) user {
    return list.age > 40    
}

在 python 中,我可以寫如下所示的函數。

beam.Filter(lambda line: line["Age"] >= 40))

正確答案


您需要在函數中新增一個發射器來發射使用者:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

如您目前程式碼所寫, 傳回 list.age > 40 list.age > 40 首先評估為 true(布林值),並且傳回該布林值。

以上是Go 中的 Apache Beam ParDo 濾鏡的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除