首页 >后端开发 >Golang >案例(三)-KisFlow-Golang Stream实战-KisFlow在多Goroutine中的应用

案例(三)-KisFlow-Golang Stream实战-KisFlow在多Goroutine中的应用

WBOY
WBOY原创
2024-07-16 07:23:18533浏览

Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-案例(三)-KisFlow-Golang Stream实战-KisFlow在多Goroutine中的应用

Github:https://github.com/aceld/kis-flow
文档:https://github.com/aceld/kis-flow/wiki


第 1 部分-概览
Part2.1-项目构建/基础模块
Part2.2-项目构建/基础模块
第三部分-数据流
Part4-功能调度
第5部分-连接器
Part6-配置导入导出
Part7-KisFlow 动作
Part8-Cache/Params 数据缓存和数据参数
Part9-多份流程
Part10-Prometheus Metrics 统计
Part11-基于反射的FaaS参数类型自适应注册


案例1-快速入门
Case2-Flow并行操作
Case3-KisFlow在多Goroutine中的应用
案例4-消息队列(MQ)应用中的KisFlow


下载 KisFlow 源代码

$go get github.com/aceld/kis-flow

KisFlow 开发者文档

源代码示例

https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines

如果需要同一个Flow在多个Goroutine中并发运行,可以使用flow.Fork()函数克隆一个内存隔离但配置相同的Flow实例。然后,每个 Flow 实例可以在不同的 Goroutine 中执行,以计算各自的数据流。

案例(三)-KisFlow-Golang Stream实战-KisFlow在多Goroutine中的应用

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "sync"
)

func main() {
    ctx := context.Background()
    // Get a WaitGroup
    var wg sync.WaitGroup

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")
    }
    // Fork the flow
    flowClone1 := flow1.Fork(ctx)

    // Add to WaitGroup
    wg.Add(2)

    // Run Flow1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)

        // Run the flow
        if err := flow1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Run FlowClone1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`)

        if err := flowClone1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Wait for 案例(三)-KisFlow-Golang Stream实战-KisFlow在多Goroutine中的应用 to finish
    wg.Wait()

    fmt.Println("All flows completed.")

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

在此代码片段中,我们启动两个 案例(三)-KisFlow-Golang Stream实战-KisFlow在多Goroutine中的应用 来同时运行 Flow1 及其克隆 (FlowClone1),以计算学生 101、1001、201 和 2001 的最终平均分数。


作者:Aceld
GitHub:https://github.com/aceld

KisFlow开源项目地址:https://github.com/aceld/kis-flow

文档:https://github.com/aceld/kis-flow/wiki


第 1 部分-概览
Part2.1-项目构建/基础模块
Part2.2-项目构建/基础模块
第三部分-数据流
Part4-功能调度
第5部分-连接器
Part6-配置导入导出
Part7-KisFlow 动作
Part8-Cache/Params 数据缓存和数据参数
Part9-多份流程
Part10-Prometheus Metrics 统计
Part11-基于反射的FaaS参数类型自适应注册


案例1-快速入门
Case2-Flow并行操作
Case3-KisFlow在多Goroutine中的应用
案例4-消息队列(MQ)应用中的KisFlow

以上是案例(三)-KisFlow-Golang Stream实战-KisFlow在多Goroutine中的应用的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn