Maison >développement back-end >Golang >Cas (IV) - KisFlow-Golang Stream Real - KisFlow dans les applications Message Queue (MQ)

Cas (IV) - KisFlow-Golang Stream Real - KisFlow dans les applications Message Queue (MQ)

WBOY
WBOYoriginal
2024-07-18 03:44:47927parcourir

Case (IV) - KisFlow-Golang Stream Real- KisFlow in Message Queue (MQ) Applications

Github : https://github.com/aceld/kis-flow
Document : https://github.com/aceld/kis-flow/wiki


Part1-OverView
Partie2.1-Construction du projet / Modules de base
Partie2.2-Construction du projet / Modules de base
Partie 3-Flux de données
Planification des fonctions Part4
Partie 5-Connecteur
Partie 6-Importation et exportation de configuration
Part7-Action KisFlow
Part8-Cache/Params Mise en cache des données et paramètres de données
Partie 9-Copies multiples du flux
Part10-Statistiques des métriques Prometheus
Partie 11 - Enregistrement adaptatif des types de paramètres FaaS basé sur la réflexion


Cas 1 – Démarrage rapide
Opération parallèle Case2-Flow
Cas3-Application de KisFlow dans Multi-Goroutines
Case4-KisFlow dans les applications de file d'attente de messages (MQ)

Télécharger KisFlow Source

$go get github.com/aceld/kis-flow

Documentation du développeur KisFlow

KisFlow avec Kafka

Exemple de code source

https://github.com/aceld/kis-flow-usage/tree/main/12-with_kafka

Dans cet exemple, nous utilisons github.com/segmentio/kafka-go comme SDK client Kafka tiers (les développeurs peuvent choisir d'autres outils Kafka Go).

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/segmentio/kafka-go"
    "sync"
    "time"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new Kafka reader
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     []string{"localhost:9092"},
        Topic:       "SourceStuScore",
        GroupID:     "group1",
        MinBytes:    10e3,                   // 10KB
        MaxBytes:    10e6,                   // 10MB
        MaxWait:     500 * time.Millisecond, // Maximum wait time
        StartOffset: kafka.FirstOffset,
    })

    defer reader.Close()

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ { // Use 5 consumers to consume in parallel
        wg.Add(1)
        go func() {
            // Fork a new flow for each consumer
            flowCopy := flowOrg.Fork(ctx)

            defer wg.Done()
            for {
                // Read a message from Kafka
                message, err := reader.ReadMessage(ctx)
                if err != nil {
                    fmt.Printf("error reading message: %v\n", err)
                    break
                }

                // Commit the message to the flow
                _ = flowCopy.CommitRow(string(message.Value))

                // Run the flow
                if err := flowCopy.Run(ctx); err != nil {
                    fmt.Println("err: ", err)
                    return
                }
            }
        }()
    }

    wg.Wait()

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow avec Nsq

Exemple de code source :

https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq

Ce consommateur KisFlow utilise github.com/nsqio/go-nsq comme SDK tiers.

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/nsqio/go-nsq"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new NSQ consumer
    config := nsq.NewConfig()
    config.MaxInFlight = 5

    consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config)
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        // Fork a new flow for each message
        flowCopy := flowOrg.Fork(ctx)

        // Commit the message to the flow
        _ = flowCopy.CommitRow(string(message.Body))

        // Run the flow
        if err := flowCopy.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return err
        }

        return nil
    }))

    err = consumer.ConnectToNSQLookupd("localhost:4161")
    if err != nil {
        panic(err)
    }

    defer consumer.Stop()

    select {}
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow avec RocketMQ

Exemple de code source :

https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq

Utilisation de github.com/apache/rocketmq-client-go comme SDK grand public RocketMQ.

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    myFlow := kis.Pool().GetFlow("CalStuAvgScore")
    if myFlow == nil {
        panic("myFlow is nil")
    }

    // Create a new RocketMQ consumer
    c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName("group1"),
        consumer.WithNameServer([]string{"localhost:9876"}),
    )
    if err != nil {
        panic(err)
    }

    err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

        for _, msg := range msgs {
            // Commit the message to the flow
            _ = myFlow.CommitRow(string(msg.Body))

        }

        // Run the flow
        if err := myFlow.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return consumer.ConsumeRetryLater, err
        }

        return consumer.ConsumeSuccess, nil
    })
    if err != nil {
        panic(err)
    }

    err = c.Start()
    if err != nil {
        panic(err)
    }

    defer c.Shutdown()

    select {}
}

Auteur : Aceld
GitHub : https://github.com/aceld

Adresse du projet Open Source KisFlow : https://github.com/aceld/kis-flow

Document : https://github.com/aceld/kis-flow/wiki


Part1-OverView
Partie2.1-Construction du projet / Modules de base
Partie2.2-Construction du projet / Modules de base
Partie 3-Flux de données
Planification des fonctions Part4
Partie 5-Connecteur
Partie 6-Importation et exportation de configuration
Part7-Action KisFlow
Part8-Cache/Params Mise en cache des données et paramètres de données
Partie 9-Copies multiples du flux
Part10-Statistiques des métriques Prometheus
Partie 11 - Enregistrement adaptatif des types de paramètres FaaS basé sur la réflexion


Cas 1 – Démarrage rapide
Opération parallèle Case2-Flow
Cas3-Application de KisFlow dans Multi-Goroutines
Case4-KisFlow dans les applications de file d'attente de messages (MQ)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn