Home  >  Article  >  Backend Development  >  Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines

Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines

WBOY
WBOYOriginal
2024-07-16 07:23:18429browse

Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


Download KisFlow Source

$go get github.com/aceld/kis-flow

KisFlow Developer Documentation

Source Code Example

https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines

If you need the same Flow to run concurrently in multiple Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines, you can use the flow.Fork() function to clone a Flow instance with isolated memory but the same configuration. Each Flow instance can then be executed in different Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines to compute their respective data streams.

Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "sync"
)

func main() {
    ctx := context.Background()
    // Get a WaitGroup
    var wg sync.WaitGroup

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")
    }
    // Fork the flow
    flowClone1 := flow1.Fork(ctx)

    // Add to WaitGroup
    wg.Add(2)

    // Run Flow1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)

        // Run the flow
        if err := flow1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Run FlowClone1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`)

        if err := flowClone1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Wait for Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines to finish
    wg.Wait()

    fmt.Println("All flows completed.")

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

In this code snippet, we start two Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines to run Flow1 and its clone (FlowClone1) concurrently to calculate the final average scores for students 101, 1001, 201, and 2001.


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

The above is the detailed content of Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn