>백엔드 개발 >Golang >마이크로서비스의 트랜잭션: 안무가 포함된 SAGA 패턴 부분

마이크로서비스의 트랜잭션: 안무가 포함된 SAGA 패턴 부분

Barbara Streisand
Barbara Streisand원래의
2025-01-23 02:05:08407검색

이 시리즈의 첫 번째 기사에서는 SAGA 패턴을 소개하고 최소한의 오케스트레이션을 통해 중앙 오케스트레이터를 사용하여 분산 트랜잭션을 관리하는 방법을 시연했습니다.

진짜로 해보자! 이번에는 서비스가 자동으로 이벤트를 내보내고 소비하여 워크플로를 조정하는 안무 접근 방식에 대해 살펴보겠습니다.

이를 실용화하기 위해 Go와 RabbitMQ를 사용하여 다중 서비스 의료 워크플로우를 구현하겠습니다. 각 서비스에는 자체 main.go가 있으므로 독립적으로 쉽게 확장, 테스트 및 실행할 수 있습니다.

SAGA 안무란 무엇인가요?

안무는 분산된 커뮤니케이션에 의존합니다. 각 서비스는 이벤트를 수신하고 새 이벤트를 내보내 후속 단계를 트리거합니다. 중앙 오케스트레이터가 없습니다. 흐름은 개별 서비스의 상호작용에서 나옵니다.

주요 이점:

  • 분리된 서비스: 각 서비스는 독립적으로 운영됩니다.
  • 확장성: 이벤트 기반 시스템은 높은 로드를 효율적으로 처리합니다.
  • 유연성: 새 서비스를 추가하는 데 워크플로 논리를 변경할 필요가 없습니다.

과제:

  • 디버깅 복잡성: 여러 서비스에서 이벤트를 추적하는 것은 까다로울 수 있습니다. (이 주제에 대한 기사를 쓸 예정입니다. 기대해 주세요!)
  • 인프라 설정: 서비스에는 모든 점을 연결하기 위해 강력한 메시지 브로커(예: RabbitMQ)가 필요합니다.
  • 이벤트 폭풍: 잘못 설계된 워크플로는 이벤트로 인해 시스템을 압도할 수 있습니다.

실제 예: 의료 워크플로우

첫 번째 기사에서 다룬 의료 업무 흐름을 다시 살펴보겠습니다.

  1. 환자 서비스: 환자 세부 정보 및 보험 적용 범위를 확인합니다.
  2. 스케줄러 서비스: 시술 일정을 예약합니다.
  3. 재고 서비스: 의약품을 비축합니다.
  4. 청구 서비스: 청구를 처리합니다.

각 서비스는 다음을 수행합니다.

  • RabbitMQ를 사용하여 특정 이벤트를 수신합니다.
  • 새 이벤트를 내보내 후속 단계를 트리거합니다.

Docker를 사용하여 RabbitMQ 설정

RabbitMQ를 이벤트 대기열로 사용하겠습니다. Docker를 사용하여 로컬로 실행하세요.

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management

http://localhost:15672(사용자 이름: guest, 비밀번호: guest)에서 RabbitMQ 관리 인터페이스에 액세스합니다.

교환, 대기열 및 바인딩 설정

이벤트를 수용하려면 RabbitMQ를 구성해야 합니다. 다음은 RabbitMQ 인프라 설정을 위한 init.go 파일의 예입니다.

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}

전체 코드는 여기!

참고: 프로덕션 설정에서는 GitOps 접근 방식(예: Terraform 사용)을 사용하여 이 설정을 관리하거나 각 서비스가 자체 대기열을 동적으로 처리하도록 할 수 있습니다.

구현: 서비스 파일

각 서비스에는 고유한 main.go가 있습니다. 실패를 적절하게 처리하기 위한 보상 조치도 포함할 예정입니다.

1. 환자 서비스

이 서비스는 환자 세부 정보를 확인하고 PatientVerified 이벤트를 생성합니다. 또한 다운스트림 오류가 발생하면 환자에게 알려서 보상합니다.

docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management

2. 스케줄러 서비스

이 서비스는 PatientVerified를 수신하고 ProcedureScheduled를 내보냅니다. 다운스트림 장애가 발생하면 절차를 취소하여 보상합니다.

package main

import (
    "log"

    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare an exchange: %v", err)
    }

    _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil)
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil)
    if err != nil {
        log.Fatalf("Failed to bind a queue: %v", err)
    }
}

부가서비스

위와 동일한 구조에 따라 인벤토리 서비스 및 청구 서비스 구현을 포함합니다. 각 서비스는 이전 이벤트를 수신하고 다음 이벤트를 내보내 오류에 대한 보상 논리가 마련되어 있는지 확인합니다.

전체코드여기


워크플로 실행

RabbitMQ 시작:

// patient/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[PatientService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled")
            if err := notifyProcedureScheduleCancellation(); err != nil {
                log.Fatalf("Failed to notify patient: %v", err)
            }
        }
    }()

    common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified")
    fmt.Println("[PatientService] Event published: PatientVerified")

    select {}
}

func notifyProcedureScheduleCancellation() error {
    fmt.Println("Compensation: Notify patient of procedure cancellation.")
    return nil
}

각 서비스 실행:
별도의 터미널을 열고 다음을 실행하세요.

// scheduler/main.go
package main

import (
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"
    "github.com/thegoodapi/saga_tutorial/choreography/common"
)

func main() {
    conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    go func() {
        fmt.Println("[SchedulerService] Waiting for events...")
        msgs, err := common.ConsumeEvent(ch, "PatientVerified")
        if err != nil {
            log.Fatalf("Failed to consume event: %v", err)
        }

        for range msgs {
            fmt.Println("[SchedulerService] Processing event: PatientVerified")
            if err := scheduleProcedure(); err != nil {
                common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure")
                fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed")
            } else {
                common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully")
                fmt.Println("[SchedulerService] Event published: ProcedureScheduled")
            }
        }
    }()

    select {}
}

func scheduleProcedure() error {
    fmt.Println("Step 2: Scheduling procedure...")
    return nil // or simulate a failure
}

출력 관찰:
각 서비스는 이벤트를 순차적으로 처리하여 워크플로 진행 상황을 기록합니다.

무슨 일이에요?

분해해보자!

우선, 이 기사의 목적에 따라 불필요한 복잡성을 피하기 위해 SuppliesReserveFailed 및 ProcedureScheduleFailed를 구현하지 않습니다.

다음과 같은 이벤트를 진행합니다

단계(또는 거래):

  • T1: (init): 환자 확인
  • T2: 절차 예정
  • T3: 소모품 예약
  • T4: 결제 성공

보상:

  • C4: 결제 실패
  • C3: 예약된 공급품 출시됨
  • C2: 절차 일정이 취소되었습니다
  • C1: NotifyFailureToUser(구현되지 않음)

다음 구현 다이어그램

high-level implementation flow

이 다이어그램은 안무를 기록하는 일반적인 접근 방식을 나타냅니다. 하지만 특히 구현이나 패턴에 익숙하지 않은 분들에게는 다소 이해하기 어렵고 다소 답답함을 느낍니다.

분해해보자!

detailed implementation flow

위의 다이어그램은 훨씬 더 장황하며 각 단계를 세분화하여 진행 상황을 더 쉽게 이해할 수 있도록 합니다.

간단히 말하면:

  1. 환자 서비스에서 환자 세부 정보를 성공적으로 확인했습니다
  2. 환자 서비스 방출 PatientVerified
  3. 스케줄러 서비스는 소비 PatientVerified
  4. 스케줄러 서비스는 약속을 성공적으로 예약합니다
  5. 스케줄러 서비스 내보내기 ProcedureScheduled
  6. 재고 서비스 소비 절차예정
  7. 재고 서비스에서 소모품을 성공적으로 예약했습니다
  8. 재고 서비스 배출 소모품예약
  9. 빌링 서비스 소비 SuppliesReserved
  10. 빌링 서비스 고객에게 요금 청구 실패 보상 시작
  11. 결제 서비스 내보내기 BillingFailed
  12. 인벤토리 서비스 소비 BillingFailed
  13. Inventory Service는 7단계에서 예약된 소모품을 출시합니다
  14. 재고 서비스는 배출 예약된 공급품을 출시합니다.
  15. 스케줄러 서비스는 소비 ReservedSuppliesReleased
  16. 스케줄러 서비스는 4단계에서 예약된 약속을 삭제합니다
  17. 스케줄러 서비스가 내보냅니다 ProcedureScheduleCancelled
  18. 환자 서비스 소비 절차일정 취소됨
  19. 환자 서비스에서 고객에게 오류를 알립니다

간결함을 위해 1, 4, 7단계에서는 실패를 구현하지 않습니다. 그러나 접근 방식은 동일합니다. 이러한 각 실패로 인해 이전 단계의 롤백이 시작됩니다.


관찰 가능성

분산 시스템을 디버깅하고 모니터링하려면 관찰 가능성이 필수적입니다. 로그, 측정항목, 추적을 구현하면 개발자가 시스템 동작을 이해하고 문제를 효율적으로 진단할 수 있습니다.

벌채 반출

  • 구조화된 로깅(예: JSON 형식)을 사용하여 이벤트와 메타데이터를 캡처합니다.
  • 서비스 전반의 워크플로를 추적하려면 로그에 상관 관계 ID를 포함하세요.

측정항목

  • 큐 크기 및 이벤트 처리 시간을 모니터링합니다.
  • Prometheus와 같은 도구를 사용하여 측정항목을 수집하고 시각화하세요.

트레이싱

  • 분산 추적(예: OpenTelemetry 사용)을 구현하여 서비스 전체에서 이벤트를 추적합니다.
  • 더 나은 통찰력을 위해 관련 데이터(예: 이벤트 이름, 타임스탬프)로 범위에 주석을 추가합니다.

이 시리즈 후반부에서 안무의 관찰 가능성에 대해 자세히 알아보겠습니다. 계속 지켜봐주세요!


주요 시사점

  • 분산형 제어: 안무를 통해 자율적인 협업이 가능합니다.
  • 이벤트 기반 단순성: RabbitMQ는 메시지 교환을 단순화합니다.
  • 확장 가능한 아키텍처: 새로운 서비스를 원활하게 추가할 수 있습니다.
  • 안무는 처음에는 매우 과격할 수 있지만 언제나 그렇듯이 연습을 하면 완벽 더 좋아질 수 있습니다!

오케스트레이션을 살펴보는 다음 기사를 기대해 주세요!

여기에서 이 시리즈의 전체 저장소를 확인하세요. 댓글로 토론해보자!

위 내용은 마이크로서비스의 트랜잭션: 안무가 포함된 SAGA 패턴 부분의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.