Maison >développement back-end >Golang >Filtre Apache Beam ParDo dans Go

Filtre Apache Beam ParDo dans Go

王林
王林avant
2024-02-05 11:57:581095parcourir

Go 中的 Apache Beam ParDo 过滤器

Contenu de la question

Je suis un développeur Python mais je devrais utiliser go pour créer un pipeline de flux de données. Je ne trouve pas autant d'exemples Apache Beam utilisant Go par rapport à Python ou Java.

J'ai le code suivant qui a une structure de nom d'utilisateur et d'âge. La tâche consiste à ajouter l'âge, puis à filtrer en fonction de l'âge. J'ai trouvé un moyen d'augmenter l'âge mais je suis bloqué sur la partie filtrage.

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.registerfunction(incrementage)
}

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func incrementage(list user) user {
    list.age++
    return list
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 40},
        {"adam", 50},
        {"john", 35},
        {"ben", 8},
    }
    initial := beam.createlist(s, userlist)

    pc := beam.pardo(s, incrementage, initial)

    pc1 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.pardo0(s, printrow, pc1)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

J'ai essayé de créer une fonction comme ci-dessous, mais cela renvoie une valeur booléenne au lieu de l'objet utilisateur. Je sais qu'il me manque quelque chose de simple mais je n'arrive pas à le comprendre.

func filterage(list user) user {
    return list.age > 40    
}

En python, je peux écrire une fonction comme ci-dessous.

beam.Filter(lambda line: line["Age"] >= 40))

Bonne réponse


Vous devez ajouter un émetteur dans la fonction pour lancer l'utilisateur :

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

Comme écrit dans votre code actuel, 返回 list.age > 40 list.age > 40 Évalue d'abord vrai (un booléen) et renvoie ce booléen.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer