>백엔드 개발 >Golang >사례 (III) - KisFlow-Golang Stream Real- 다중 고루틴에서 KisFlow 적용

사례 (III) - KisFlow-Golang Stream Real- 다중 고루틴에서 KisFlow 적용

WBOY
WBOY원래의
2024-07-16 07:23:18534검색

Case (III) - KisFlow-Golang Stream Real- Application of KisFlow in Multi-사례 (III) - KisFlow-Golang Stream Real- 다중 고루틴에서 KisFlow 적용

Github: https://github.com/aceld/kis-flow
문서: https://github.com/aceld/kis-flow/wiki


1부-개요
Part2.1-프로젝트 구성 / 기본 모듈
Part2.2-프로젝트 구성 / 기본 모듈
Part3-데이터 스트림
Part4-기능 스케줄링
5부-커넥터
Part6-구성 가져오기 및 내보내기
Part7-KisFlow 액션
Part8-캐시/매개변수 데이터 캐싱 및 데이터 매개변수
Part9-흐름의 다중 복사본
Part10-프로메테우스 지표 통계
Part11 - Reflection을 기반으로 한 FaaS 매개변수 유형의 적응적 등록


사례1-빠른 시작
Case2-Flow 병렬운전
사례3 - 다중 고루틴에 KisFlow 적용
Message Queue(MQ) 애플리케이션의 Case4-KisFlow


KisFlow 소스 다운로드

$go get github.com/aceld/kis-flow

KisFlow 개발자 문서

소스 코드 예

https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines

여러 고루틴에서 동시에 실행하기 위해 동일한 Flow가 필요한 경우 flow.Fork() 함수를 사용하여 격리된 메모리를 사용하지만 동일한 구성으로 Flow 인스턴스를 복제할 수 있습니다. 그러면 각 Flow 인스턴스를 서로 다른 고루틴에서 실행하여 해당 데이터 스트림을 계산할 수 있습니다.

사례 (III) - KisFlow-Golang Stream Real- 다중 고루틴에서 KisFlow 적용

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "sync"
)

func main() {
    ctx := context.Background()
    // Get a WaitGroup
    var wg sync.WaitGroup

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")
    }
    // Fork the flow
    flowClone1 := flow1.Fork(ctx)

    // Add to WaitGroup
    wg.Add(2)

    // Run Flow1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)

        // Run the flow
        if err := flow1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Run FlowClone1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`)

        if err := flowClone1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Wait for 사례 (III) - KisFlow-Golang Stream Real- 다중 고루틴에서 KisFlow 적용 to finish
    wg.Wait()

    fmt.Println("All flows completed.")

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

이 코드 조각에서는 두 개의 Goroutine을 시작하여 Flow1과 해당 클론(FlowClone1)을 동시에 실행하여 학생 101, 1001, 201 및 2001의 최종 평균 점수를 계산합니다.


저자: 아셀드
GitHub: https://github.com/aceld

KisFlow 오픈소스 프로젝트 주소: https://github.com/aceld/kis-flow

문서: https://github.com/aceld/kis-flow/wiki


1부-개요
Part2.1-프로젝트 구성 / 기본 모듈
Part2.2-프로젝트 구성 / 기본 모듈
Part3-데이터 스트림
Part4-기능 스케줄링
5부-커넥터
Part6-구성 가져오기 및 내보내기
Part7-KisFlow 액션
Part8-캐시/매개변수 데이터 캐싱 및 데이터 매개변수
Part9-흐름의 다중 복사본
Part10-프로메테우스 지표 통계
Part11 - Reflection을 기반으로 한 FaaS 매개변수 유형의 적응적 등록


사례1-빠른 시작
Case2-Flow 병렬운전
사례3 - 다중 고루틴에 KisFlow 적용
Message Queue(MQ) 애플리케이션의 Case4-KisFlow

위 내용은 사례 (III) - KisFlow-Golang Stream Real- 다중 고루틴에서 KisFlow 적용의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.