>백엔드 개발 >Golang >사례 (IV) - KisFlow-Golang Stream Real- 메시지 큐(MQ) 애플리케이션의 KisFlow

사례 (IV) - KisFlow-Golang Stream Real- 메시지 큐(MQ) 애플리케이션의 KisFlow

WBOY
WBOY원래의
2024-07-18 03:44:47927검색

Case (IV) - KisFlow-Golang Stream Real- KisFlow in Message Queue (MQ) Applications

Github: https://github.com/aceld/kis-flow
문서: https://github.com/aceld/kis-flow/wiki


1부-개요
Part2.1-프로젝트 구성 / 기본 모듈
Part2.2-프로젝트 구성 / 기본 모듈
Part3-데이터 스트림
Part4-기능 스케줄링
5부-커넥터
Part6-구성 가져오기 및 내보내기
Part7-KisFlow 액션
Part8-캐시/매개변수 데이터 캐싱 및 데이터 매개변수
Part9-흐름의 다중 복사본
Part10-프로메테우스 지표 통계
Part11 - Reflection을 기반으로 한 FaaS 매개변수 유형의 적응적 등록


사례1-빠른 시작
Case2-Flow 병렬운전
사례3 - 다중 고루틴에 KisFlow 적용
Message Queue(MQ) 애플리케이션의 Case4-KisFlow

KisFlow 소스 다운로드

$go get github.com/aceld/kis-flow

KisFlow 개발자 문서

Kafka를 사용한 KisFlow

샘플 소스 코드

https://github.com/aceld/kis-flow-usage/tree/main/12-with_kafka

이 예에서는 github.com/segmentio/kafka-go를 타사 Kafka 클라이언트 SDK로 사용합니다(개발자는 다른 Kafka Go 도구를 선택할 수 있음).

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/segmentio/kafka-go"
    "sync"
    "time"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new Kafka reader
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     []string{"localhost:9092"},
        Topic:       "SourceStuScore",
        GroupID:     "group1",
        MinBytes:    10e3,                   // 10KB
        MaxBytes:    10e6,                   // 10MB
        MaxWait:     500 * time.Millisecond, // Maximum wait time
        StartOffset: kafka.FirstOffset,
    })

    defer reader.Close()

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ { // Use 5 consumers to consume in parallel
        wg.Add(1)
        go func() {
            // Fork a new flow for each consumer
            flowCopy := flowOrg.Fork(ctx)

            defer wg.Done()
            for {
                // Read a message from Kafka
                message, err := reader.ReadMessage(ctx)
                if err != nil {
                    fmt.Printf("error reading message: %v\n", err)
                    break
                }

                // Commit the message to the flow
                _ = flowCopy.CommitRow(string(message.Value))

                // Run the flow
                if err := flowCopy.Run(ctx); err != nil {
                    fmt.Println("err: ", err)
                    return
                }
            }
        }()
    }

    wg.Wait()

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

Nsq를 사용한 KisFlow

샘플 소스 코드:

https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq

이 KisFlow 소비자는 타사 SDK로 github.com/nsqio/go-nsq를 사용합니다.

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/nsqio/go-nsq"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new NSQ consumer
    config := nsq.NewConfig()
    config.MaxInFlight = 5

    consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config)
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        // Fork a new flow for each message
        flowCopy := flowOrg.Fork(ctx)

        // Commit the message to the flow
        _ = flowCopy.CommitRow(string(message.Body))

        // Run the flow
        if err := flowCopy.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return err
        }

        return nil
    }))

    err = consumer.ConnectToNSQLookupd("localhost:4161")
    if err != nil {
        panic(err)
    }

    defer consumer.Stop()

    select {}
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

RocketMQ를 사용한 KisFlow

샘플 소스 코드:

https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq

RocketMQ 소비자 SDK로 github.com/apache/rocketmq-client-go 사용

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    myFlow := kis.Pool().GetFlow("CalStuAvgScore")
    if myFlow == nil {
        panic("myFlow is nil")
    }

    // Create a new RocketMQ consumer
    c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName("group1"),
        consumer.WithNameServer([]string{"localhost:9876"}),
    )
    if err != nil {
        panic(err)
    }

    err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

        for _, msg := range msgs {
            // Commit the message to the flow
            _ = myFlow.CommitRow(string(msg.Body))

        }

        // Run the flow
        if err := myFlow.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return consumer.ConsumeRetryLater, err
        }

        return consumer.ConsumeSuccess, nil
    })
    if err != nil {
        panic(err)
    }

    err = c.Start()
    if err != nil {
        panic(err)
    }

    defer c.Shutdown()

    select {}
}

저자: 아셀드
GitHub: https://github.com/aceld

KisFlow 오픈소스 프로젝트 주소: https://github.com/aceld/kis-flow

문서: https://github.com/aceld/kis-flow/wiki


1부-개요
Part2.1-프로젝트 구성 / 기본 모듈
Part2.2-프로젝트 구성 / 기본 모듈
Part3-데이터 스트림
Part4-기능 스케줄링
5부-커넥터
Part6-구성 가져오기 및 내보내기
Part7-KisFlow 액션
Part8-캐시/매개변수 데이터 캐싱 및 데이터 매개변수
Part9-흐름의 다중 복사본
Part10-프로메테우스 지표 통계
Part11 - Reflection을 기반으로 한 FaaS 매개변수 유형의 적응적 등록


사례1-빠른 시작
Case2-Flow 병렬운전
사례3 - 다중 고루틴에 KisFlow 적용
Message Queue(MQ) 애플리케이션의 Case4-KisFlow

위 내용은 사례 (IV) - KisFlow-Golang Stream Real- 메시지 큐(MQ) 애플리케이션의 KisFlow의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.