首頁 >後端開發 >Golang >Apache Beam 從 Go 中的 PCollection 中選擇前 N 行

Apache Beam 從 Go 中的 PCollection 中選擇前 N 行

PHPz
PHPz轉載
2024-02-10 17:48:08586瀏覽

Apache Beam 从 Go 中的 PCollection 中选择前 N 行

Apache Beam 是一個開源的分散式資料處理框架,它提供了一個統一的程式設計模型,可以在不同的批次和串流處理引擎上運行。最近,Apache Beam 的 Go SDK 中新增了一個非常有用的功能——從 PCollection 中選擇前 N 行。這個功能對於需要對大型資料集進行取樣或快速預覽的場景非常有幫助。在本文中,我們將介紹如何在 Apache Beam 的 Go SDK 中使用這個功能,並展示一些實際的範例程式碼。讓我們開始吧!

問題內容

我有一個 pcollection,我需要從中選擇 n 個最大的行。我正在嘗試使用 go 創建資料流管道並陷入困境。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

從上面的程式碼中,我需要根據 user.age 選擇前 5 行 我發現鏈接頂部包具有相同的功能,但它說它返回單個元素 pcollection。有什麼不同?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

我像上面一樣加入了選擇前 5 行的函數,但出現錯誤 []main.user is not allocate to main.user

我需要與以前相同格式的 pcollection,因為我需要進一步處理。我懷疑這是因為 top.largest 函數傳回單一元素 pcollection。關於如何轉換格式有什麼想法嗎?

解決方法

最好的 pcollection 是 []user

所以嘗試一下...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)

以上是Apache Beam 從 Go 中的 PCollection 中選擇前 N 行的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除