Maison  >  Article  >  développement back-end  >  Apache Beam sélectionne les N premières lignes de PCollection dans Go

Apache Beam sélectionne les N premières lignes de PCollection dans Go

PHPz
PHPzavant
2024-02-10 17:48:08523parcourir

Apache Beam 从 Go 中的 PCollection 中选择前 N 行

Apache Beam est un framework de traitement de données distribué open source qui fournit un modèle de programmation unifié pouvant s'exécuter sur différents moteurs de traitement par lots et par flux. Récemment, une fonctionnalité très utile a été ajoutée au SDK Go d'Apache Beam : la sélection des N premières lignes d'une PCollection. Cette fonctionnalité est très utile pour les scénarios dans lesquels de grands ensembles de données doivent être échantillonnés ou prévisualisés rapidement. Dans cet article, nous expliquerons comment utiliser cette fonctionnalité dans le SDK Go d'Apache Beam et montrerons des exemples de code pratiques. commençons!

Contenu de la question

J'ai une pcollection dans laquelle je dois sélectionner les n plus grandes lignes. J'essaie de créer un pipeline de flux de données en utilisant Go et je suis bloqué.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

À partir du code ci-dessus, je dois sélectionner les 5 premières lignes en fonction de user.age J'ai trouvé le lien en haut du package qui a la même fonctionnalité mais il indique qu'il renvoie un seul élément pcollection. Quelle est la différence?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

J'ai ajouté la fonction pour sélectionner les 5 premières lignes comme ci-dessus mais j'ai eu une erreur []main.user is not allocate to main.user

J'ai besoin de la collection dans le même format qu'avant car je dois la traiter davantage. Je soupçonne que c'est parce que la fonction top.largest renvoie un seul élément pcollection. Des idées sur la façon de convertir le format ?

Solution

La meilleure collection est []user

Alors essayez-le...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer