Maison >développement back-end >Golang >Utiliser CloudEvents dans Go

Utiliser CloudEvents dans Go

Susan Sarandon
Susan Sarandonoriginal
2024-12-04 22:40:21958parcourir

Using CloudEvents in Go

L'adoption d'une architecture basée sur les événements (EDA) pour augmenter l'évolutivité et réduire le couplage entre composants/services est relativement courante dans les environnements complexes.

Bien que cette approche résolve un certain nombre de problèmes, l'un des défis auxquels sont confrontées les équipes est de standardiser les événements pour garantir la compatibilité entre tous les composants. Pour atténuer ce défi, nous pouvons utiliser le projet CloudEvents .

Le projet se veut une spécification permettant de normaliser et de décrire les événements, apportant cohérence, accessibilité et portabilité. Un autre avantage est que le projet fournit une série de SDK pour accélérer l'adoption par les équipes en plus d'être une spécification.

Dans cet article, je souhaite démontrer l'utilisation du SDK Go (avec une apparition spéciale du SDK Python ) dans un projet fictif.

Considérons un environnement composé de deux microservices : un utilisateur, qui gère les utilisateurs (CRUD), et un service d'audit, qui stocke les événements importants dans l'environnement pour une analyse future.

Le code de service du service utilisateur est le suivant :

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/cloudevents/sdk-go/v2/protocol"
    "github.com/go-chi/chi/v5"
    "github.com/go-chi/httplog"
    "github.com/google/uuid"
)

const auditService = "http://localhost:8080/"

func main() {
    logger := httplog.NewLogger("user", httplog.Options{
        JSON: true,
    })
    ctx := context.Background()
    ceClient, err := cloudevents.NewClientHTTP()
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    r := chi.NewRouter()
    r.Use(httplog.RequestLogger(logger))
    r.Post("/v1/user", storeUser(ctx, ceClient))

    http.Handle("/", r)
    srv := &http.Server{
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        Addr:         ":3000",
        Handler:      http.DefaultServeMux,
    }
    err = srv.ListenAndServe()
    if err != nil {
        logger.Panic().Msg(err.Error())
    }
}

type userRequest struct {
    ID       uuid.UUID
    Name     string `json:"name"`
    Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        oplog := httplog.LogEntry(r.Context())

        var ur userRequest
        err := json.NewDecoder(r.Body).Decode(&ur)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            oplog.Error().Msg(err.Error())
            return
        }
        ur.ID = uuid.New()
        //TODO: store user in a database

        // Create an Event.
        event := cloudevents.NewEvent()
        event.SetSource("github.com/eminetto/post-cloudevents")
        event.SetType("user.storeUser")
        event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

        // Set a target.
        ctx := cloudevents.ContextWithTarget(context.Background(), auditService)

        // Send that Event.
        var result protocol.Result
        if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) {
            oplog.Error().Msgf("failed to send, %v", result)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }

        return
    }
}

Dans le code, vous pouvez voir la création d'un événement et son envoi au service d'audit, qui ressemble à ceci :

package main

import (
    "context"
    "fmt"
    "log"

    cloudevents "github.com/cloudevents/sdk-go/v2"
)

func receive(event cloudevents.Event) {
    // do something with event.
    fmt.Printf("%s", event)
}

func main() {
    // The default client is HTTP.
    c, err := cloudevents.NewClientHTTP()
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }
    if err = c.StartReceiver(context.Background(), receive); err != nil {
        log.Fatalf("failed to start receiver: %v", err)
    }
}

En exécutant les deux services, vous pouvez voir comment ils fonctionnent en envoyant une requête à l'utilisateur :

curl -X "POST" "http://localhost:3000/v1/user" \
     -H 'Accept: application/json' \
     -H 'Content-Type: application/json' \
     -d $'{
  "name": "Ozzy Osbourne",
  "password": "12345"
}'

La sortie utilisateur est :

{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"}
{"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}

La sortie du service d'audit démontre la réception de l'événement.

❯ go run main.go
Context Attributes,
  specversion: 1.0
  type: user.storeUser
  source: github.com/eminetto/post-cloudevents
  id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6
  time: 2024-11-28T18:53:17.474154Z
  datacontenttype: application/json
Data,
  {
    "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7"
  }

Pour valider l'objectif de portabilité, j'ai utilisé le SDK Python pour implémenter une version du service d'audit :

from flask import Flask, request

from cloudevents.http import from_http

app = Flask(__name__)


# create an endpoint at http://localhost:/3000/
@app.route("/", methods=["POST"])
def home():
    # create a CloudEvent
    event = from_http(request.headers, request.get_data())

    # you can access cloudevent fields as seen below
    print(
        f"Found {event['id']} from {event['source']} with type "
        f"{event['type']} and specversion {event['specversion']}"
    )

    return "", 204


if __name__ == "__main__":
    app.run(port=8080)

Le résultat de l'application affiche la réception de l'événement sans qu'il soit nécessaire de modifier l'utilisateur du service :

(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py
 * Serving Flask app 'main'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:8080
Press CTRL+C to quit
Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0
127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -

L'exemple précédent présente les SDK CloudEvents, mais il viole un principe des architectures basées sur les événements : desserrer le couplage. L'utilisateur de l'application connaît l'application d'audit et y est lié, ce qui n'est pas une bonne pratique. Nous pouvons améliorer cette situation en utilisant d'autres fonctionnalités de CloudEvents, telles que pub/sub, ou en ajoutant quelque chose comme Kafka. L'exemple suivant utilise Kafka pour découpler les deux applications.

La première étape consistait à créer un docker-compose.yaml pour utiliser Kafka :

services:
  kafka:
    image: bitnami/kafka:latest
    restart: on-failure
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_NUM_PARTITIONS=3
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

Le changement suivant concerne l'utilisateur du service :

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    "github.com/IBM/sarama"
    "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/go-chi/chi/v5"
    "github.com/go-chi/httplog"
    "github.com/google/uuid"
)

const (
    auditService = "127.0.0.1:9092"
    auditTopic   = "audit"
)

func main() {
    logger := httplog.NewLogger("user", httplog.Options{
        JSON: true,
    })
    ctx := context.Background()

    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_0_0_0

    sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic)
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }

    defer sender.Close(context.Background())

    ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    r := chi.NewRouter()
    r.Use(httplog.RequestLogger(logger))
    r.Post("/v1/user", storeUser(ctx, ceClient))

    http.Handle("/", r)
    srv := &http.Server{
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        Addr:         ":3000",
        Handler:      http.DefaultServeMux,
    }
    err = srv.ListenAndServe()
    if err != nil {
        logger.Panic().Msg(err.Error())
    }
}

type userRequest struct {
    ID       uuid.UUID
    Name     string `json:"name"`
    Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        oplog := httplog.LogEntry(r.Context())

        var ur userRequest
        err := json.NewDecoder(r.Body).Decode(&ur)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            oplog.Error().Msg(err.Error())
            return
        }
        ur.ID = uuid.New()
        //TODO: store user in a database

        // Create an Event.
        event := cloudevents.NewEvent()
        event.SetID(uuid.New().String())
        event.SetSource("github.com/eminetto/post-cloudevents")
        event.SetType("user.storeUser")
        event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

        // Send that Event.
        if result := ceClient.Send(
            // Set the producer message key
            kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())),
            event,
        ); cloudevents.IsUndelivered(result) {
            oplog.Error().Msgf("failed to send, %v", result)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }

        return
    }
}


Quelques changements ont été nécessaires, principalement pour établir un lien avec Kafka, mais l'événement en lui-même n'a pas changé.

J'ai apporté un changement similaire au service d'audit :

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/IBM/sarama"

    "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
)

const (
    auditService = "127.0.0.1:9092"
    auditTopic   = "audit"
    auditGroupID = "audit-group-id"
)

func receive(event cloudevents.Event) {
    // do something with event.
    fmt.Printf("%s", event)
}

func main() {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_0_0_0

    receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic)
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }

    defer receiver.Close(context.Background())

    c, err := cloudevents.NewClient(receiver)
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    if err = c.StartReceiver(context.Background(), receive); err != nil {
        log.Fatalf("failed to start receiver: %v", err)
    }
}

Le résultat des applications reste le même.

Avec l'inclusion de Kafka, nous avons découplé les applications, ne violant plus les principes de l'EDA tout en conservant les avantages apportés par CloudEvents.

L'objectif de cet article était de présenter la norme et de démontrer la facilité de mise en œuvre à l'aide des SDK. Je pourrais aborder le sujet plus en profondeur, mais j'espère avoir atteint l'objectif et inspiré la recherche et l'utilisation de la technologie.

Ce serait très utile si vous utilisez/avez déjà utilisé CloudEvents et que vous souhaitez partager vos expériences dans les commentaires.

Vous pouvez retrouver les codes que j'ai présentés dans cet article dans le référentiel sur GitHub.

Publié à l'origine sur https://eltonminetto.dev le 29 novembre 2024.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn