Cas (II) - Calcul en temps réel KisFlow-Golang Stream - Fonctionnement parallèle en flux

Case (II) - KisFlow-Golang Stream Real-Time Computing - Flow Parallel Operation

Github :
Document :

Partie2.1-Construction du projet / Modules de base
Partie2.2-Construction du projet / Modules de base
Partie 3-Flux de données
Planification des fonctions Part4
Partie 5-Connecteur
Partie 6-Importation et exportation de configuration
Part7-Action KisFlow
Part8-Cache/Params Mise en cache des données et paramètres de données
Partie 9-Copies multiples du flux
Part10-Statistiques des métriques Prometheus
Partie 11 - Enregistrement adaptatif des types de paramètres FaaS basé sur la réflexion

Cas 1 – Démarrage rapide
Opération parallèle Case2-Flow
Cas3-Application de KisFlow dans Multi-Goroutines
Case4-KisFlow dans les applications de file d'attente de messages (MQ)

Télécharger KisFlow Source

$go get

Documentation du développeur KisFlow

Exemple de code source

KisFlow peut réaliser la combinaison de deux flux via un connecteur

En utilisant la combinaison des deux flux suivants, cette introduction couvrira l'interface et l'utilisation du Connecteur.

Diagramme de flux de données

Flow Diagram

Présentation du cas

Supposons qu'un élève possède quatre attributs :

Student ID: stu_id
Credit 1: score_1
Credit 2: score_2
Credit 3: score_3

Définissez Flow1 : CalStuAvgScore-1-2 pour calculer le score moyen d'un élève du crédit 1 (score_1) et du crédit 2 (score_2) (avg_score_1_2).
Définissez Flow2 : CalStuAvgScore-3 pour calculer le score moyen d'un élève pour le crédit 3 (score_3) et avg_score_1_2, qui est la moyenne du crédit 1, du crédit 2 et du crédit 3. La moyenne du crédit 1 et du crédit 2 est fournie par Flow1.


Flow1 se compose de 4 fonctions :

V (Fonction : VerifyStu) pour vérifier la validité de StuId
C (Fonction : AvgStuScore12) pour calculer le score moyen du Crédit 1 et du Crédit 2
S (Fonction : SaveScoreAvg12) pour stocker avg_score_1_2 dans Redis
E (Fonction : PrintStuAvgScore) pour imprimer le score moyen du Crédit 1 et du Crédit 2.


Flow2 se compose de 4 fonctions :

V (Fonction : VerifyStu) pour vérifier la validité de StuId
L (Fonction : LoadScoreAvg12) pour lire le score moyen de l'élève actuel du crédit 1 et du crédit 2 (avg_score_1_2) calculé par Flow1
C (Fonction : AvgStuScore3) pour calculer le score moyen du Crédit 3 et le score moyen du Crédit 1 et du Crédit 2
E (Fonction : PrintStuAvgScore) pour imprimer le score moyen du crédit 1, du crédit 2 et du crédit 3.


kistype: func
fname: AvgStuScore3
fmode: Calculate
    name: SourceStuScore
        - stu_id


kistype: func
fname: LoadScoreAvg12
fmode: Load
    name: SourceStuScore
        - stu_id
    cname: Score12Cache

Protocole de données de base


package main

type StuScore1_2 struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`

type StuScoreAvg struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`

type StuScore3 struct {
    StuId      int     `json:"stu_id"`
    AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
    Score3     int     `json:"score_3"`

Initialisation du connecteur

Le Connecteur défini dans ce projet, Score12Cache, est une ressource de lien associée à Redis. Ce connecteur nécessite une méthode d'initialisation pour établir une connexion au démarrage de KisFlow.


package main

import (

// type ConnInit func(conn Connector) error

func InitScore12Cache(connector kis.Connector) error {
    fmt.Println("===> Call Connector InitScore12Cache")

    // init Redis Conn Client
    rdb := redis.NewClient(&redis.Options{
        Addr:     connector.GetConfig().AddrString, // Redis-Server address
        Password: "",                               // password
        DB:       0,                                // select db

    // Ping test
    pong, err := rdb.Ping(context.Background()).Result()
    if err != nil {
        log.Logger().ErrorF("Failed to connect to Redis: %v", err)
        return err
    fmt.Println("Connected to Redis:", pong)

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)

    return nil

Ici, l'instance Redis connectée avec succès est stockée dans la variable de cache "rdb" du connecteur.

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)

Implémentation FaaS

Fonction (V) : VérifierStu


package main

import (

type VerifyStuIn struct {
    StuId int `json:"stu_id"`

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
    fmt.Printf("->Call Func VerifyStu\n")

    for _, stu := range rows {
        // Filter out invalid data
        if stu.StuId < 0 || stu.StuId > 999 {
            // Terminate the current Flow process, subsequent functions of the current Flow will not be executed
            return flow.Next(kis.ActionAbort)

    return flow.Next(kis.ActionDataReuse)

VerifyStu() est utilisé pour valider les données. Si les données ne répondent pas aux exigences, le flux de données en cours est interrompu. Enfin, les données sont réutilisées et transmises à la couche suivante via flow.Next(kis.ActionDataReuse).

Fonction (C) : AvgStuScore12


package main

import (

type AvgStuScoreIn_1_2 struct {

type AvgStuScoreOut_1_2 struct {

func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
    fmt.Printf("->Call Func AvgStuScore12\n")

    for _, row := range rows {

        out := AvgStuScoreOut_1_2{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: float64(row.Score1+row.Score2) / 2,

        // Submit result data
        _ = flow.CommitRow(out)

    return flow.Next()

AvgStuScore12() calcule le score moyen de score_1 et score_2, ce qui donne avg_score.

Fonction(s) : SaveScoreAvg12


package main

import (

type SaveStuScoreIn struct {

func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // Set data to redis
    pipe := rdb.Pipeline()

    for _, score := range rows {
        // make key
        key := conn.GetConfig().Key + strconv.Itoa(score.StuId)

        pipe.HMSet(context.Background(), key, map[string]interface{}{
            "avg_score": score.AvgScore,

    _, err := pipe.Exec(ctx)
    if err != nil {
        return err

    return nil

func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
    fmt.Printf("->Call Func SaveScoreAvg12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("SaveScoreAvg12(): GetConnector err = %s\n", err.Error())
        return err

    if BatchSetStuScores(ctx, conn, rows) != nil {
        fmt.Printf("SaveScoreAvg12(): BatchSetStuScores err = %s\n", err.Error())
        return err

    return flow.Next(kis.ActionDataReuse)

SaveScoreAvg12() stocke les données dans Redis via le connecteur lié, en utilisant la clé configurée dans le connecteur. Enfin, les données sources sont transmises de manière transparente à la fonction suivante.

Fonction (E) : PrintStuAvgScore


package main

import (

type PrintStuAvgScoreIn struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())

    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)

    return flow.Next()

PrintStuAvgScore() imprime le score moyen de l'élève actuel.

Fonction (L) : LoadScoreAvg12


package main

import (

type LoadStuScoreIn struct {

type LoadStuScoreOut struct {

func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // make key
    key := conn.GetConfig().Key + strconv.Itoa(stuId)

    // get data from redis
    result, err := rdb.HGetAll(ctx, key).Result()
    if err != nil {
        return 0, err

    // get value
    avgScoreStr, ok := result["avg_score"]
    if !ok {
        return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)

    // parse to float64
    avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
    if err != nil {
        return 0, err

    return avgScore, nil

func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
    fmt.Printf("->Call Func LoadScoreAvg12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
        return err

    for _, row := range rows {
        stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
        if err != nil {
            fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
            return err

        out := LoadStuScoreOut{
            StuScore3: StuScore3{
                StuId:      row.StuId,
                Score3:     row.Score3,
                AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)

        // commit result
        _ = flow.CommitRow(out)

    return flow.Next()

LoadScoreAvg12() reads the average score of score_1 and score_2 from Redis through the linked resource Redis of the bound Connector using the key configured in the Connector. It then sends the source data from upstream, along with the newly read average score of score1 and score2, to the next layer.

Function(C): AvgStuScore3


package main

import (

type AvgStuScore3In struct {

type AvgStuScore3Out struct {

func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
    fmt.Printf("->Call Func AvgStuScore3\n")

    for _, row := range rows {

        out := AvgStuScore3Out{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,

        // Submit result data
        _ = flow.CommitRow(out)

    return flow.Next()

AvgStuScore3() recalculates the average score of three scores by adding score_3 and the average score of score_1 and score_2, resulting in the final average score avg_score.

Register FaaS & CaaSInit/CaaS (Register Function/Connector)


func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
    kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
    kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
    kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)

    // Register connectors
    kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)

Main Process


package main

import (

func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {

    // Commit data
    _ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err

    return nil

func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {

    // Commit data
    _ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err

    return nil

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {

    var wg sync.WaitGroup

    go func() {
        // Run flow1 concurrently
        defer wg.Done()

        flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
        if flow1 == nil {
            panic("flow1 is nil")

        if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {

    go func() {
        // Run flow2 concurrently
        defer wg.Done()

        flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
        if flow2 == nil {
            panic("flow2 is nil")

        if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {



Two Goroutines are launched concurrently to execute Flow1 and Flow2, calculating the final average scores for student 101 and student 102.

Execution Results

===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreAvg12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore3
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]

In Flow[CalStuAvgScore3], we observe the final computed average scores for scores 1, 2, and 3.

Author: Aceld

KisFlow Open Source Project Address:


