Maison >développement back-end >Golang >Utiliser CloudEvents dans Go
L'adoption d'une architecture basée sur les événements (EDA) pour augmenter l'évolutivité et réduire le couplage entre composants/services est relativement courante dans les environnements complexes.
Bien que cette approche résolve un certain nombre de problèmes, l'un des défis auxquels sont confrontées les équipes est de standardiser les événements pour garantir la compatibilité entre tous les composants. Pour atténuer ce défi, nous pouvons utiliser le projet CloudEvents .
Le projet se veut une spécification permettant de normaliser et de décrire les événements, apportant cohérence, accessibilité et portabilité. Un autre avantage est que le projet fournit une série de SDK pour accélérer l'adoption par les équipes en plus d'être une spécification.
Dans cet article, je souhaite démontrer l'utilisation du SDK Go (avec une apparition spéciale du SDK Python ) dans un projet fictif.
Considérons un environnement composé de deux microservices : un utilisateur, qui gère les utilisateurs (CRUD), et un service d'audit, qui stocke les événements importants dans l'environnement pour une analyse future.
Le code de service du service utilisateur est le suivant :
package main import ( "context" "encoding/json" "log" "net/http" "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const auditService = "http://localhost:8080/" func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() ceClient, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Set a target. ctx := cloudevents.ContextWithTarget(context.Background(), auditService) // Send that Event. var result protocol.Result if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
Dans le code, vous pouvez voir la création d'un événement et son envoi au service d'audit, qui ressemble à ceci :
package main import ( "context" "fmt" "log" cloudevents "github.com/cloudevents/sdk-go/v2" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { // The default client is HTTP. c, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
En exécutant les deux services, vous pouvez voir comment ils fonctionnent en envoyant une requête à l'utilisateur :
curl -X "POST" "http://localhost:3000/v1/user" \ -H 'Accept: application/json' \ -H 'Content-Type: application/json' \ -d $'{ "name": "Ozzy Osbourne", "password": "12345" }'
La sortie utilisateur est :
{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"} {"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}
La sortie du service d'audit démontre la réception de l'événement.
❯ go run main.go Context Attributes, specversion: 1.0 type: user.storeUser source: github.com/eminetto/post-cloudevents id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6 time: 2024-11-28T18:53:17.474154Z datacontenttype: application/json Data, { "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7" }
Pour valider l'objectif de portabilité, j'ai utilisé le SDK Python pour implémenter une version du service d'audit :
from flask import Flask, request from cloudevents.http import from_http app = Flask(__name__) # create an endpoint at http://localhost:/3000/ @app.route("/", methods=["POST"]) def home(): # create a CloudEvent event = from_http(request.headers, request.get_data()) # you can access cloudevent fields as seen below print( f"Found {event['id']} from {event['source']} with type " f"{event['type']} and specversion {event['specversion']}" ) return "", 204 if __name__ == "__main__": app.run(port=8080)
Le résultat de l'application affiche la réception de l'événement sans qu'il soit nécessaire de modifier l'utilisateur du service :
(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py * Serving Flask app 'main' * Debug mode: off WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Running on http://127.0.0.1:8080 Press CTRL+C to quit Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0 127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -
L'exemple précédent présente les SDK CloudEvents, mais il viole un principe des architectures basées sur les événements : desserrer le couplage. L'utilisateur de l'application connaît l'application d'audit et y est lié, ce qui n'est pas une bonne pratique. Nous pouvons améliorer cette situation en utilisant d'autres fonctionnalités de CloudEvents, telles que pub/sub, ou en ajoutant quelque chose comme Kafka. L'exemple suivant utilise Kafka pour découpler les deux applications.
La première étape consistait à créer un docker-compose.yaml pour utiliser Kafka :
services: kafka: image: bitnami/kafka:latest restart: on-failure ports: - 9092:9092 environment: - KAFKA_CFG_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_NUM_PARTITIONS=3 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper zookeeper: image: bitnami/zookeeper:latest ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes
Le changement suivant concerne l'utilisateur du service :
package main import ( "context" "encoding/json" "log" "net/http" "time" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" ) func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer sender.Close(context.Background()) ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetID(uuid.New().String()) event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Send that Event. if result := ceClient.Send( // Set the producer message key kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())), event, ); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
Quelques changements ont été nécessaires, principalement pour établir un lien avec Kafka, mais l'événement en lui-même n'a pas changé.
J'ai apporté un changement similaire au service d'audit :
package main import ( "context" "fmt" "log" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" auditGroupID = "audit-group-id" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer receiver.Close(context.Background()) c, err := cloudevents.NewClient(receiver) if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
Le résultat des applications reste le même.
Avec l'inclusion de Kafka, nous avons découplé les applications, ne violant plus les principes de l'EDA tout en conservant les avantages apportés par CloudEvents.
L'objectif de cet article était de présenter la norme et de démontrer la facilité de mise en œuvre à l'aide des SDK. Je pourrais aborder le sujet plus en profondeur, mais j'espère avoir atteint l'objectif et inspiré la recherche et l'utilisation de la technologie.
Ce serait très utile si vous utilisez/avez déjà utilisé CloudEvents et que vous souhaitez partager vos expériences dans les commentaires.
Vous pouvez retrouver les codes que j'ai présentés dans cet article dans le référentiel sur GitHub.
Publié à l'origine sur https://eltonminetto.dev le 29 novembre 2024.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!