이 시리즈의 첫 번째 기사에서는 SAGA 패턴을 소개하고 최소한의 오케스트레이션을 통해 중앙 오케스트레이터를 사용하여 분산 트랜잭션을 관리하는 방법을 시연했습니다.
진짜로 해보자! 이번에는 서비스가 자동으로 이벤트를 내보내고 소비하여 워크플로를 조정하는 안무 접근 방식에 대해 살펴보겠습니다.
이를 실용화하기 위해 Go와 RabbitMQ를 사용하여 다중 서비스 의료 워크플로우를 구현하겠습니다. 각 서비스에는 자체 main.go가 있으므로 독립적으로 쉽게 확장, 테스트 및 실행할 수 있습니다.
안무는 분산된 커뮤니케이션에 의존합니다. 각 서비스는 이벤트를 수신하고 새 이벤트를 내보내 후속 단계를 트리거합니다. 중앙 오케스트레이터가 없습니다. 흐름은 개별 서비스의 상호작용에서 나옵니다.
첫 번째 기사에서 다룬 의료 업무 흐름을 다시 살펴보겠습니다.
각 서비스는 다음을 수행합니다.
RabbitMQ를 이벤트 대기열로 사용하겠습니다. Docker를 사용하여 로컬로 실행하세요.
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
http://localhost:15672(사용자 이름: guest, 비밀번호: guest)에서 RabbitMQ 관리 인터페이스에 액세스합니다.
이벤트를 수용하려면 RabbitMQ를 구성해야 합니다. 다음은 RabbitMQ 인프라 설정을 위한 init.go 파일의 예입니다.
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
전체 코드는 여기!
참고: 프로덕션 설정에서는 GitOps 접근 방식(예: Terraform 사용)을 사용하여 이 설정을 관리하거나 각 서비스가 자체 대기열을 동적으로 처리하도록 할 수 있습니다.
각 서비스에는 고유한 main.go가 있습니다. 실패를 적절하게 처리하기 위한 보상 조치도 포함할 예정입니다.
이 서비스는 환자 세부 정보를 확인하고 PatientVerified 이벤트를 생성합니다. 또한 다운스트림 오류가 발생하면 환자에게 알려서 보상합니다.
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
이 서비스는 PatientVerified를 수신하고 ProcedureScheduled를 내보냅니다. 다운스트림 장애가 발생하면 절차를 취소하여 보상합니다.
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
위와 동일한 구조에 따라 인벤토리 서비스 및 청구 서비스 구현을 포함합니다. 각 서비스는 이전 이벤트를 수신하고 다음 이벤트를 내보내 오류에 대한 보상 논리가 마련되어 있는지 확인합니다.
전체코드여기
RabbitMQ 시작:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
각 서비스 실행:
별도의 터미널을 열고 다음을 실행하세요.
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
출력 관찰:
각 서비스는 이벤트를 순차적으로 처리하여 워크플로 진행 상황을 기록합니다.
분해해보자!
우선, 이 기사의 목적에 따라 불필요한 복잡성을 피하기 위해 SuppliesReserveFailed 및 ProcedureScheduleFailed를 구현하지 않습니다.
다음과 같은 이벤트를 진행합니다
단계(또는 거래):
보상:
다음 구현 다이어그램
이 다이어그램은 안무를 기록하는 일반적인 접근 방식을 나타냅니다. 하지만 특히 구현이나 패턴에 익숙하지 않은 분들에게는 다소 이해하기 어렵고 다소 답답함을 느낍니다.
분해해보자!
위의 다이어그램은 훨씬 더 장황하며 각 단계를 세분화하여 진행 상황을 더 쉽게 이해할 수 있도록 합니다.
간단히 말하면:
간결함을 위해 1, 4, 7단계에서는 실패를 구현하지 않습니다. 그러나 접근 방식은 동일합니다. 이러한 각 실패로 인해 이전 단계의 롤백이 시작됩니다.
분산 시스템을 디버깅하고 모니터링하려면 관찰 가능성이 필수적입니다. 로그, 측정항목, 추적을 구현하면 개발자가 시스템 동작을 이해하고 문제를 효율적으로 진단할 수 있습니다.
이 시리즈 후반부에서 안무의 관찰 가능성에 대해 자세히 알아보겠습니다. 계속 지켜봐주세요!
오케스트레이션을 살펴보는 다음 기사를 기대해 주세요!
여기에서 이 시리즈의 전체 저장소를 확인하세요. 댓글로 토론해보자!
위 내용은 마이크로서비스의 트랜잭션: 안무가 포함된 SAGA 패턴 부분의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!