Maison > Article > développement back-end > Cas (IV) - KisFlow-Golang Stream Real - KisFlow dans les applications Message Queue (MQ)
Github : https://github.com/aceld/kis-flow
Document : https://github.com/aceld/kis-flow/wiki
Part1-OverView
Partie2.1-Construction du projet / Modules de base
Partie2.2-Construction du projet / Modules de base
Partie 3-Flux de données
Planification des fonctions Part4
Partie 5-Connecteur
Partie 6-Importation et exportation de configuration
Part7-Action KisFlow
Part8-Cache/Params Mise en cache des données et paramètres de données
Partie 9-Copies multiples du flux
Part10-Statistiques des métriques Prometheus
Partie 11 - Enregistrement adaptatif des types de paramètres FaaS basé sur la réflexion
Cas 1 – Démarrage rapide
Opération parallèle Case2-Flow
Cas3-Application de KisFlow dans Multi-Goroutines
Case4-KisFlow dans les applications de file d'attente de messages (MQ)
$go get github.com/aceld/kis-flow
Documentation du développeur KisFlow
https://github.com/aceld/kis-flow-usage/tree/main/12-with_kafka
Dans cet exemple, nous utilisons github.com/segmentio/kafka-go comme SDK client Kafka tiers (les développeurs peuvent choisir d'autres outils Kafka Go).
package main import ( "context" "fmt" "github.com/aceld/kis-flow/file" "github.com/aceld/kis-flow/kis" "github.com/segmentio/kafka-go" "sync" "time" ) func main() { ctx := context.Background() // Load Configuration from file if err := file.ConfigImportYaml("conf/"); err != nil { panic(err) } // Get the flow flowOrg := kis.Pool().GetFlow("CalStuAvgScore") if flowOrg == nil { panic("flowOrg is nil") } // Create a new Kafka reader reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "SourceStuScore", GroupID: "group1", MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB MaxWait: 500 * time.Millisecond, // Maximum wait time StartOffset: kafka.FirstOffset, }) defer reader.Close() var wg sync.WaitGroup for i := 0; i < 5; i++ { // Use 5 consumers to consume in parallel wg.Add(1) go func() { // Fork a new flow for each consumer flowCopy := flowOrg.Fork(ctx) defer wg.Done() for { // Read a message from Kafka message, err := reader.ReadMessage(ctx) if err != nil { fmt.Printf("error reading message: %v\n", err) break } // Commit the message to the flow _ = flowCopy.CommitRow(string(message.Value)) // Run the flow if err := flowCopy.Run(ctx); err != nil { fmt.Println("err: ", err) return } } }() } wg.Wait() return } func init() { // Register functions kis.Pool().FaaS("VerifyStu", VerifyStu) kis.Pool().FaaS("AvgStuScore", AvgStuScore) kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore) }
https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq
Ce consommateur KisFlow utilise github.com/nsqio/go-nsq comme SDK tiers.
package main import ( "context" "fmt" "github.com/aceld/kis-flow/file" "github.com/aceld/kis-flow/kis" "github.com/nsqio/go-nsq" ) func main() { ctx := context.Background() // Load Configuration from file if err := file.ConfigImportYaml("conf/"); err != nil { panic(err) } // Get the flow flowOrg := kis.Pool().GetFlow("CalStuAvgScore") if flowOrg == nil { panic("flowOrg is nil") } // Create a new NSQ consumer config := nsq.NewConfig() config.MaxInFlight = 5 consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config) if err != nil { panic(err) } consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { // Fork a new flow for each message flowCopy := flowOrg.Fork(ctx) // Commit the message to the flow _ = flowCopy.CommitRow(string(message.Body)) // Run the flow if err := flowCopy.Run(ctx); err != nil { fmt.Println("err: ", err) return err } return nil })) err = consumer.ConnectToNSQLookupd("localhost:4161") if err != nil { panic(err) } defer consumer.Stop() select {} } func init() { // Register functions kis.Pool().FaaS("VerifyStu", VerifyStu) kis.Pool().FaaS("AvgStuScore", AvgStuScore) kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore) }
https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq
Utilisation de github.com/apache/rocketmq-client-go comme SDK grand public RocketMQ.
package main import ( "context" "fmt" "github.com/aceld/kis-flow/file" "github.com/aceld/kis-flow/kis" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { // Load Configuration from file if err := file.ConfigImportYaml("conf/"); err != nil { panic(err) } // Get the flow myFlow := kis.Pool().GetFlow("CalStuAvgScore") if myFlow == nil { panic("myFlow is nil") } // Create a new RocketMQ consumer c, err := rocketmq.NewPushConsumer( consumer.WithGroupName("group1"), consumer.WithNameServer([]string{"localhost:9876"}), ) if err != nil { panic(err) } err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for _, msg := range msgs { // Commit the message to the flow _ = myFlow.CommitRow(string(msg.Body)) } // Run the flow if err := myFlow.Run(ctx); err != nil { fmt.Println("err: ", err) return consumer.ConsumeRetryLater, err } return consumer.ConsumeSuccess, nil }) if err != nil { panic(err) } err = c.Start() if err != nil { panic(err) } defer c.Shutdown() select {} }
Auteur : Aceld
GitHub : https://github.com/aceld
Adresse du projet Open Source KisFlow : https://github.com/aceld/kis-flow
Document : https://github.com/aceld/kis-flow/wiki
Part1-OverView
Partie2.1-Construction du projet / Modules de base
Partie2.2-Construction du projet / Modules de base
Partie 3-Flux de données
Planification des fonctions Part4
Partie 5-Connecteur
Partie 6-Importation et exportation de configuration
Part7-Action KisFlow
Part8-Cache/Params Mise en cache des données et paramètres de données
Partie 9-Copies multiples du flux
Part10-Statistiques des métriques Prometheus
Partie 11 - Enregistrement adaptatif des types de paramètres FaaS basé sur la réflexion
Cas 1 – Démarrage rapide
Opération parallèle Case2-Flow
Cas3-Application de KisFlow dans Multi-Goroutines
Case4-KisFlow dans les applications de file d'attente de messages (MQ)
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!