首頁  >  文章  >  後端開發  >  Go 中的 Apache Beam 左連接

Go 中的 Apache Beam 左連接

WBOY
WBOY轉載
2024-02-11 09:12:07428瀏覽

Go 中的 Apache Beam 左连接

php小編小新在這裡跟大家介紹一下Go語言中的Apache Beam左邊連結。 Apache Beam是一種分散式資料處理框架,它提供了一種通用的程式設計模型,用於在不同的分散式資料處理引擎上執行批次和串流處理任務。而左連接是一種常見的資料處理操作,它可以將兩個資料集按照某個鍵進行關聯,傳回左側資料集中的所有記錄,以及與之相符的右側資料集中的記錄。本文將詳細介紹Go語言中如何使用Apache Beam進行左連線操作。

問題內容

有沒有簡單的方法可以使用 go 執行 2 個 pcollection 的左連接? 我發現 sql 連線僅在 java 中可用。

package main

import (
    "context"
    "flag"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type customer struct {
    CustID int
    FName  string
}

type order struct {
    OrderID int
    Amount  int
    Cust_ID int
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var custList = []customer{
        {1, "Bob"},
        {2, "Adam"},
        {3, "John"},
        {4, "Ben"},
        {5, "Jose"},
        {6, "Bryan"},
        {7, "Kim"},
        {8, "Tim"},
    }

    var orderList = []order{
        {123, 100, 1},
        {125, 30, 3},
        {128, 50, 7},
    }

    custPCol := beam.CreateList(s, custList)

    orderPCol := beam.CreateList(s, orderList)

    // Left Join custPcol with orderPCol
    // Expected Result
    // CustID | FName   |OrderID| Amount
    //     1  | Bob     |   123 | 100
    //     2  | Adam    |       |
    //     3  | John    |   125 | 100
    //     4  | Ben     |       |
    //     5  | Jose    |       |
    //     6  | Bryan   |       |
    //     7  | Kim     |   125 | 100
    //     8  | Tim     |       |

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

我想加入這 2 個 pcollection 並執行進一步的操作。我看到了有關 cogroupbykey 的文檔,但無法將其轉換為普通 sql join 可以執行的格式。

對此有什麼建議嗎?

解決方法

嘗試這樣

type resulttype struct {
    custid  int
    fname   string
    orderid int
    amount  int
}

result := beam.pardo(s, func(c customer, iterorder func(*order) bool) resulttype {
    var o order

    for iterorder(&o) {
        if c.custid == o.cust_id {
            return resulttype{
                custid:  c.custid,
                fname:   c.fname,
                orderid: o.orderid,
                amount:  o.amount,
            }
        }
    }

    return resulttype{
        custid: c.custid,
        fname:  c.fname,
    }
}, custpcol, beam.sideinput{input: orderpcol})

或如果您想使用 cogroupbykey ...

custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
    return c.CustID, c
}, custPCol)

orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
    return o.Cust_ID, o
}, orderPCol)

resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)

beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
    c, o := customer{}, order{}
    for custIter(&c) {
        if ok := orderIter(&o); ok {
            fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
        }
        fmt.Println(CustID, c.FName)
    }
}, resultPCol)

以上是Go 中的 Apache Beam 左連接的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除