Heim >Backend-Entwicklung >Golang >Transaktionen in Microservices: Teil SAGA-Muster mit Choreografie
Im ersten Artikel dieser Serie haben wir das SAGA-Muster vorgestellt und gezeigt, wie eine minimale Orchestrierung verteilte Transaktionen mit einem zentralen Orchestrator verwalten kann.
Lass uns real werden! Dieses Mal befassen wir uns mit dem Choreografie-Ansatz, bei dem Dienste Arbeitsabläufe koordinieren, indem sie Ereignisse autonom aussenden und konsumieren.
Um dies praktisch zu machen, implementieren wir einen Multi-Service-Workflow für das Gesundheitswesen mit Go und RabbitMQ. Jeder Dienst verfügt über ein eigenes main.go, sodass er einfach unabhängig skaliert, getestet und ausgeführt werden kann.
Choreografie setzt auf dezentrale Kommunikation. Jeder Dienst wartet auf Ereignisse und löst nachfolgende Schritte aus, indem er neue Ereignisse ausgibt. Es gibt keinen zentralen Orchestrator; Der Fluss entsteht aus den Interaktionen einzelner Dienste.
Lassen Sie uns unseren Arbeitsablauf im Gesundheitswesen vom ersten Artikel noch einmal Revue passieren lassen:
Jeder Dienst wird:
Wir verwenden RabbitMQ als Ereigniswarteschlange. Führen Sie es lokal mit Docker aus:
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Zugriff auf die RabbitMQ-Verwaltungsoberfläche unter http://localhost:15672 (Benutzername: Gast, Passwort: Gast).
Wir müssen RabbitMQ so konfigurieren, dass es unseren Veranstaltungen gerecht wird. Hier ist eine Beispieldatei init.go zum Einrichten der RabbitMQ-Infrastruktur:
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
Vollständiger Code hier!
Hinweis:In einer Produktionsumgebung möchten Sie dieses Setup möglicherweise mit einem GitOps-Ansatz verwalten (z. B. mit Terraform) oder jeden Dienst seine eigenen Warteschlangen dynamisch verwalten lassen.
Jeder Dienst wird sein eigenes main.go haben. Wir werden auch Entschädigungsmaßnahmen für den ordnungsgemäßen Umgang mit Fehlern einbeziehen.
Dieser Dienst überprüft Patientendaten und gibt ein PatientVerified-Ereignis aus. Es kompensiert auch, indem es den Patienten benachrichtigt, wenn ein Downstream-Fehler auftritt.
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Dieser Dienst wartet auf PatientVerified und gibt ProcedureScheduled aus. Dies wird dadurch kompensiert, dass der Vorgang abgebrochen wird, wenn ein Downstream-Fehler auftritt.
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
Beziehen Sie die Implementierungen von Inventory Service und Billing Service ein und folgen Sie dabei der gleichen Struktur wie oben. Jeder Dienst wartet auf das vorherige Ereignis und gibt das nächste aus, um sicherzustellen, dass eine Kompensationslogik für Fehler vorhanden ist.
Vollständiger Codehier!
RabbitMQ starten:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
Jeden Dienst ausführen:
Öffnen Sie separate Terminals und führen Sie Folgendes aus:
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
Ausgabe beobachten:
Jeder Dienst verarbeitet Ereignisse nacheinander und protokolliert den Workflow-Fortschritt.
Lassen Sie es uns aufschlüsseln!
Zunächst einmal implementieren wir für die Zwecke dieses Artikels keine SuppliesReserveFailed und ProcedureScheduleFailed, um übermäßige Komplexität zu vermeiden.
Wir führen folgende Veranstaltungen durch
Schritte (oder Transaktionen):
Vergütungen:
Folgen Sie diesem Implementierungsdiagramm
Dieses Diagramm stellt einen gängigen Ansatz zur Dokumentation von Choreografie dar. Allerdings finde ich es etwas schwierig zu verstehen und etwas frustrierend, insbesondere für diejenigen, die mit der Implementierung oder dem Muster nicht vertraut sind.
Lassen Sie es uns aufschlüsseln!
Das obige Diagramm ist viel ausführlicher und schlüsselt jeden Schritt auf, sodass es leichter zu verstehen ist, was vor sich geht.
Kurz gesagt:
Beachten Sie, dass wir der Kürze halber keine Fehler für die Schritte 1, 4 und 7 implementieren; Der Ansatz wäre jedoch derselbe. Jeder dieser Fehler würde ein Rollback der vorherigen Schritte auslösen.
Beobachtbarkeit ist für das Debuggen und Überwachen verteilter Systeme unerlässlich. Durch die Implementierung von Protokollen, Metriken und Traces wird sichergestellt, dass Entwickler das Systemverhalten verstehen und Probleme effizient diagnostizieren können.
Wir werden uns später in dieser Serie mit der Beobachtbarkeit in der Chorografie befassen, bleiben Sie dran!
Bleiben Sie gespannt auf den nächsten Artikel, in dem wir uns mit der Orchestrierung befassen!
Sehen Sie sich hier das vollständige Repository für diese Serie an. Lasst uns in den Kommentaren diskutieren!
Das obige ist der detaillierte Inhalt vonTransaktionen in Microservices: Teil SAGA-Muster mit Choreografie. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!