>백엔드 개발 >Golang >Go에서 CloudEvents 사용

Go에서 CloudEvents 사용

Susan Sarandon
Susan Sarandon원래의
2024-12-04 22:40:21947검색

Using CloudEvents in Go

확장성을 높이고 구성요소/서비스 간의 결합을 줄이기 위해 이벤트 중심 아키텍처(EDA)를 채택하는 것은 복잡한 환경에서 비교적 일반적입니다.

이 접근 방식은 여러 문제를 해결하지만 팀이 직면한 과제 중 하나는 이벤트를 표준화하여 모든 구성 요소 간의 호환성을 보장하는 것입니다. 이 문제를 완화하기 위해 CloudEvents 프로젝트를 사용할 수 있습니다.

이 프로젝트는 이벤트를 표준화 및 설명하고 일관성, 접근성 및 이식성을 제공하기 위한 사양을 목표로 합니다. 또 다른 장점은 프로젝트가 사양일 뿐만 아니라 팀 채택을 가속화하기 위해 일련의 SDK를 제공한다는 것입니다.

이 게시물에서는 가상 프로젝트에서 Go SDK(Python SDK의 특별한 모습 포함)를 사용하는 방법을 시연하고 싶습니다.

두 개의 마이크로서비스, 즉 사용자를 관리하는 사용자(CRUD)와 향후 분석을 위해 중요한 이벤트를 환경에 저장하는 감사 서비스로 구성된 환경을 생각해 보겠습니다.

사용자 서비스의 서비스 코드는 다음과 같습니다.

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/cloudevents/sdk-go/v2/protocol"
    "github.com/go-chi/chi/v5"
    "github.com/go-chi/httplog"
    "github.com/google/uuid"
)

const auditService = "http://localhost:8080/"

func main() {
    logger := httplog.NewLogger("user", httplog.Options{
        JSON: true,
    })
    ctx := context.Background()
    ceClient, err := cloudevents.NewClientHTTP()
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    r := chi.NewRouter()
    r.Use(httplog.RequestLogger(logger))
    r.Post("/v1/user", storeUser(ctx, ceClient))

    http.Handle("/", r)
    srv := &http.Server{
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        Addr:         ":3000",
        Handler:      http.DefaultServeMux,
    }
    err = srv.ListenAndServe()
    if err != nil {
        logger.Panic().Msg(err.Error())
    }
}

type userRequest struct {
    ID       uuid.UUID
    Name     string `json:"name"`
    Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        oplog := httplog.LogEntry(r.Context())

        var ur userRequest
        err := json.NewDecoder(r.Body).Decode(&ur)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            oplog.Error().Msg(err.Error())
            return
        }
        ur.ID = uuid.New()
        //TODO: store user in a database

        // Create an Event.
        event := cloudevents.NewEvent()
        event.SetSource("github.com/eminetto/post-cloudevents")
        event.SetType("user.storeUser")
        event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

        // Set a target.
        ctx := cloudevents.ContextWithTarget(context.Background(), auditService)

        // Send that Event.
        var result protocol.Result
        if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) {
            oplog.Error().Msgf("failed to send, %v", result)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }

        return
    }
}

코드에서는 다음과 같이 이벤트 생성 및 감사 서비스로의 전송을 볼 수 있습니다.

package main

import (
    "context"
    "fmt"
    "log"

    cloudevents "github.com/cloudevents/sdk-go/v2"
)

func receive(event cloudevents.Event) {
    // do something with event.
    fmt.Printf("%s", event)
}

func main() {
    // The default client is HTTP.
    c, err := cloudevents.NewClientHTTP()
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }
    if err = c.StartReceiver(context.Background(), receive); err != nil {
        log.Fatalf("failed to start receiver: %v", err)
    }
}

두 서비스를 모두 실행하면 사용자에게 요청을 보내 서비스가 어떻게 작동하는지 확인할 수 있습니다.

curl -X "POST" "http://localhost:3000/v1/user" \
     -H 'Accept: application/json' \
     -H 'Content-Type: application/json' \
     -d $'{
  "name": "Ozzy Osbourne",
  "password": "12345"
}'

사용자 출력은 다음과 같습니다.

{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"}
{"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}

감사 서비스의 출력은 이벤트 수신을 보여줍니다.

❯ go run main.go
Context Attributes,
  specversion: 1.0
  type: user.storeUser
  source: github.com/eminetto/post-cloudevents
  id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6
  time: 2024-11-28T18:53:17.474154Z
  datacontenttype: application/json
Data,
  {
    "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7"
  }

이식성 목표를 검증하기 위해 Python SDK를 사용하여 감사 서비스 버전을 구현했습니다.

from flask import Flask, request

from cloudevents.http import from_http

app = Flask(__name__)


# create an endpoint at http://localhost:/3000/
@app.route("/", methods=["POST"])
def home():
    # create a CloudEvent
    event = from_http(request.headers, request.get_data())

    # you can access cloudevent fields as seen below
    print(
        f"Found {event['id']} from {event['source']} with type "
        f"{event['type']} and specversion {event['specversion']}"
    )

    return "", 204


if __name__ == "__main__":
    app.run(port=8080)

애플리케이션 출력에는 서비스 사용자를 변경할 필요 없이 이벤트 수신이 표시됩니다.

(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py
 * Serving Flask app 'main'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:8080
Press CTRL+C to quit
Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0
127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -

이전 예에서는 CloudEvents SDK를 소개했지만 이벤트 기반 아키텍처의 원칙인 느슨한 결합을 위반했습니다. 애플리케이션 사용자는 감사 애플리케이션을 인식하고 이에 연결되어 있지만 이는 좋은 관행이 아닙니다. 게시/구독과 같은 다른 CloudEvents 기능을 사용하거나 Kafka와 같은 기능을 추가하여 이러한 상황을 개선할 수 있습니다. 다음 예에서는 Kafka를 사용하여 두 애플리케이션을 분리합니다.

첫 번째 단계는 Kafka를 사용하기 위해 하나의 docker-compose.yaml을 만드는 것이었습니다.

services:
  kafka:
    image: bitnami/kafka:latest
    restart: on-failure
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_NUM_PARTITIONS=3
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

서비스 사용자가 다음과 같이 변경되었습니다.

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    "github.com/IBM/sarama"
    "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/go-chi/chi/v5"
    "github.com/go-chi/httplog"
    "github.com/google/uuid"
)

const (
    auditService = "127.0.0.1:9092"
    auditTopic   = "audit"
)

func main() {
    logger := httplog.NewLogger("user", httplog.Options{
        JSON: true,
    })
    ctx := context.Background()

    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_0_0_0

    sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic)
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }

    defer sender.Close(context.Background())

    ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    r := chi.NewRouter()
    r.Use(httplog.RequestLogger(logger))
    r.Post("/v1/user", storeUser(ctx, ceClient))

    http.Handle("/", r)
    srv := &http.Server{
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        Addr:         ":3000",
        Handler:      http.DefaultServeMux,
    }
    err = srv.ListenAndServe()
    if err != nil {
        logger.Panic().Msg(err.Error())
    }
}

type userRequest struct {
    ID       uuid.UUID
    Name     string `json:"name"`
    Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        oplog := httplog.LogEntry(r.Context())

        var ur userRequest
        err := json.NewDecoder(r.Body).Decode(&ur)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            oplog.Error().Msg(err.Error())
            return
        }
        ur.ID = uuid.New()
        //TODO: store user in a database

        // Create an Event.
        event := cloudevents.NewEvent()
        event.SetID(uuid.New().String())
        event.SetSource("github.com/eminetto/post-cloudevents")
        event.SetType("user.storeUser")
        event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

        // Send that Event.
        if result := ceClient.Send(
            // Set the producer message key
            kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())),
            event,
        ); cloudevents.IsUndelivered(result) {
            oplog.Error().Msgf("failed to send, %v", result)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }

        return
    }
}


주로 Kafka와의 연결을 위해 몇 가지 변경이 필요했지만 이벤트 자체는 변경되지 않았습니다.

감사 서비스에도 비슷한 변경을 했습니다.

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/IBM/sarama"

    "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
)

const (
    auditService = "127.0.0.1:9092"
    auditTopic   = "audit"
    auditGroupID = "audit-group-id"
)

func receive(event cloudevents.Event) {
    // do something with event.
    fmt.Printf("%s", event)
}

func main() {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_0_0_0

    receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic)
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }

    defer receiver.Close(context.Background())

    c, err := cloudevents.NewClient(receiver)
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    if err = c.StartReceiver(context.Background(), receive); err != nil {
        log.Fatalf("failed to start receiver: %v", err)
    }
}

애플리케이션의 출력은 동일하게 유지됩니다.

Kafka를 포함하면서 CloudEvents가 제공하는 이점을 유지하면서 더 이상 EDA 원칙을 위반하지 않고 애플리케이션을 분리했습니다.

이 게시물의 목표는 표준을 소개하고 SDK를 사용한 구현의 용이성을 보여주는 것이었습니다. 주제를 더 깊이 다룰 수 있지만 기술의 목적과 영감을 주는 연구 및 사용을 달성했기를 바랍니다.

이미 CloudEvents를 사용하고 계시거나, 댓글로 경험을 공유하고 싶으시다면 매우 유용할 것입니다.

이 게시물에서 제가 제시한 코드는 GitHub 저장소에서 찾을 수 있습니다.

원본은 2024년 11월 29일 https://eltonminetto.dev에 게시되었습니다.

위 내용은 Go에서 CloudEvents 사용의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.