Rumah >pembangunan bahagian belakang >Golang >Menggunakan CloudEvents dalam Go
Mengguna pakai seni bina dipacu peristiwa (EDA) untuk meningkatkan kebolehskalaan dan mengurangkan gandingan antara komponen/perkhidmatan adalah perkara biasa dalam persekitaran yang kompleks.
Walaupun pendekatan ini menyelesaikan beberapa masalah, salah satu cabaran yang dihadapi oleh pasukan ialah menyeragamkan acara untuk memastikan keserasian antara semua komponen. Untuk mengurangkan cabaran ini, kami boleh menggunakan projek CloudEvents.
Projek ini bertujuan untuk menjadi spesifikasi untuk menyeragamkan dan menerangkan acara, membawa ketekalan, kebolehaksesan dan mudah alih. Kelebihan lain ialah projek ini menyediakan satu siri SDK untuk mempercepatkan penggunaan pasukan selain menjadi spesifikasi.
Dalam siaran ini, saya ingin menunjukkan penggunaan Go SDK (dengan penampilan istimewa oleh Python SDK ) dalam projek rekaan.
Mari kita pertimbangkan persekitaran yang terdiri daripada dua perkhidmatan mikro: pengguna, yang mengurus pengguna (CRUD) dan perkhidmatan audit, yang menyimpan peristiwa penting dalam persekitaran untuk analisis masa hadapan.
Kod perkhidmatan perkhidmatan pengguna adalah seperti berikut:
package main import ( "context" "encoding/json" "log" "net/http" "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const auditService = "http://localhost:8080/" func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() ceClient, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Set a target. ctx := cloudevents.ContextWithTarget(context.Background(), auditService) // Send that Event. var result protocol.Result if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
Dalam kod, anda boleh melihat penciptaan acara dan penghantarannya kepada perkhidmatan audit, yang kelihatan seperti ini:
package main import ( "context" "fmt" "log" cloudevents "github.com/cloudevents/sdk-go/v2" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { // The default client is HTTP. c, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
Dengan menjalankan kedua-dua perkhidmatan, anda boleh melihat cara ia berfungsi dengan menghantar permintaan kepada pengguna :
curl -X "POST" "http://localhost:3000/v1/user" \ -H 'Accept: application/json' \ -H 'Content-Type: application/json' \ -d $'{ "name": "Ozzy Osbourne", "password": "12345" }'
Output pengguna ialah:
{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"} {"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}
Output perkhidmatan audit menunjukkan penerimaan acara.
❯ go run main.go Context Attributes, specversion: 1.0 type: user.storeUser source: github.com/eminetto/post-cloudevents id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6 time: 2024-11-28T18:53:17.474154Z datacontenttype: application/json Data, { "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7" }
Untuk mengesahkan matlamat mudah alih, saya menggunakan Python SDK untuk melaksanakan versi perkhidmatan audit:
from flask import Flask, request from cloudevents.http import from_http app = Flask(__name__) # create an endpoint at http://localhost:/3000/ @app.route("/", methods=["POST"]) def home(): # create a CloudEvent event = from_http(request.headers, request.get_data()) # you can access cloudevent fields as seen below print( f"Found {event['id']} from {event['source']} with type " f"{event['type']} and specversion {event['specversion']}" ) return "", 204 if __name__ == "__main__": app.run(port=8080)
Output aplikasi menunjukkan penerimaan acara tanpa memerlukan perubahan kepada pengguna perkhidmatan:
(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py * Serving Flask app 'main' * Debug mode: off WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Running on http://127.0.0.1:8080 Press CTRL+C to quit Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0 127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -
Contoh sebelumnya memperkenalkan CloudEvents SDK, tetapi ia melanggar prinsip seni bina berasaskan acara: longgarkan gandingan. Pengguna aplikasi mengetahui dan terikat dengan aplikasi pengauditan, yang bukan amalan yang baik. Kami boleh memperbaiki keadaan ini dengan menggunakan ciri CloudEvents lain, seperti pub/sub, atau dengan menambahkan sesuatu seperti Kafka. Contoh berikut menggunakan Kafka untuk memisahkan dua aplikasi.
Langkah pertama ialah membuat satu docker-compose.yaml untuk menggunakan Kafka:
services: kafka: image: bitnami/kafka:latest restart: on-failure ports: - 9092:9092 environment: - KAFKA_CFG_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_NUM_PARTITIONS=3 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper zookeeper: image: bitnami/zookeeper:latest ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes
Perubahan berikut adalah dalam pengguna perkhidmatan:
package main import ( "context" "encoding/json" "log" "net/http" "time" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" ) func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer sender.Close(context.Background()) ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetID(uuid.New().String()) event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Send that Event. if result := ceClient.Send( // Set the producer message key kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())), event, ); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
Beberapa perubahan diperlukan, terutamanya untuk membuat sambungan dengan Kafka, tetapi acara itu sendiri tidak berubah.
Saya membuat perubahan yang serupa kepada perkhidmatan audit:
package main import ( "context" "fmt" "log" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" auditGroupID = "audit-group-id" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer receiver.Close(context.Background()) c, err := cloudevents.NewClient(receiver) if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
Output aplikasi kekal sama.
Dengan kemasukan Kafka, kami memisahkan aplikasi, tidak lagi melanggar prinsip EDA sambil mengekalkan kelebihan yang disediakan oleh CloudEvents.
Matlamat siaran ini adalah untuk memperkenalkan standard dan menunjukkan kemudahan pelaksanaan menggunakan SDK. Saya boleh membincangkan subjek dengan lebih mendalam, tetapi saya berharap saya telah mencapai objektif dan penyelidikan yang diilhamkan dan penggunaan teknologi.
Adalah sangat berguna jika anda sudah menggunakan/telah menggunakan CloudEvents dan ingin berkongsi pengalaman anda dalam ulasan.
Anda boleh menemui kod yang saya bentangkan dalam siaran ini dalam repositori di GitHub.
Diterbitkan pada asalnya di https://eltonminetto.dev pada 29 Nov 2024.
Atas ialah kandungan terperinci Menggunakan CloudEvents dalam Go. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!