Heim  >  Artikel  >  Backend-Entwicklung  >  Apache Beam hat sich in Go eingefügt

Apache Beam hat sich in Go eingefügt

WBOY
WBOYnach vorne
2024-02-11 09:12:07428Durchsuche

Go 中的 Apache Beam 左连接

php-Editor Xiaoxin ist hier, um Ihnen den Apache Beam Left Join in der Go-Sprache vorzustellen. Apache Beam ist ein verteiltes Datenverarbeitungs-Framework, das ein gemeinsames Programmiermodell für die Ausführung von Batch- und Stream-Verarbeitungsaufgaben auf verschiedenen verteilten Datenverarbeitungs-Engines bereitstellt. Der Left Join ist eine übliche Datenverarbeitungsoperation. Er kann zwei Datensätze gemäß einem bestimmten Schlüssel verknüpfen und alle Datensätze im linken Datensatz und die übereinstimmenden Datensätze im rechten Datensatz zurückgeben. In diesem Artikel wird detailliert beschrieben, wie Sie mit Apache Beam Left-Join-Operationen in der Go-Sprache ausführen.

Frageninhalt

Gibt es eine einfache Möglichkeit, mit go einen Left-Join von 2 pcollections durchzuführen? Ich habe festgestellt, dass die SQL-Verbindung nur in Java verfügbar ist.

package main

import (
    "context"
    "flag"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type customer struct {
    CustID int
    FName  string
}

type order struct {
    OrderID int
    Amount  int
    Cust_ID int
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var custList = []customer{
        {1, "Bob"},
        {2, "Adam"},
        {3, "John"},
        {4, "Ben"},
        {5, "Jose"},
        {6, "Bryan"},
        {7, "Kim"},
        {8, "Tim"},
    }

    var orderList = []order{
        {123, 100, 1},
        {125, 30, 3},
        {128, 50, 7},
    }

    custPCol := beam.CreateList(s, custList)

    orderPCol := beam.CreateList(s, orderList)

    // Left Join custPcol with orderPCol
    // Expected Result
    // CustID | FName   |OrderID| Amount
    //     1  | Bob     |   123 | 100
    //     2  | Adam    |       |
    //     3  | John    |   125 | 100
    //     4  | Ben     |       |
    //     5  | Jose    |       |
    //     6  | Bryan   |       |
    //     7  | Kim     |   125 | 100
    //     8  | Tim     |       |

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

Ich möchte diesen beiden Sammlungen beitreten und weitere Vorgänge durchführen. Ich habe die Dokumentation für cogroupbykey gesehen, kann sie jedoch nicht in ein Format konvertieren, das mit einem normalen SQL-Join möglich ist.

Irgendwelche Vorschläge dazu?

Lösung

Probieren Sie es aus

type resulttype struct {
    custid  int
    fname   string
    orderid int
    amount  int
}

result := beam.pardo(s, func(c customer, iterorder func(*order) bool) resulttype {
    var o order

    for iterorder(&o) {
        if c.custid == o.cust_id {
            return resulttype{
                custid:  c.custid,
                fname:   c.fname,
                orderid: o.orderid,
                amount:  o.amount,
            }
        }
    }

    return resulttype{
        custid: c.custid,
        fname:  c.fname,
    }
}, custpcol, beam.sideinput{input: orderpcol})

Oder wenn Sie cogroupbykey verwenden möchten...

custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
    return c.CustID, c
}, custPCol)

orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
    return o.Cust_ID, o
}, orderPCol)

resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)

beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
    c, o := customer{}, order{}
    for custIter(&c) {
        if ok := orderIter(&o); ok {
            fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
        }
        fmt.Println(CustID, c.FName)
    }
}, resultPCol)

Das obige ist der detaillierte Inhalt vonApache Beam hat sich in Go eingefügt. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen