在本系列的第一篇文章中,我們介紹了SAGA 模式,並示範了最小的編排如何使用中央編排器管理分散式事務。
讓我們面對現實吧!這次,我們將深入探討編排方法,其中服務透過自主發出和消費事件來協調工作流程。
為了使其切實可行,我們將使用 Go 和 RabbitMQ 實現多服務醫療保健工作流程。每個服務都有自己的 main.go,使其易於擴展、測試和獨立運行。
什麼是SAGA編排?
編排依賴去中心化的溝通。每個服務都會偵聽事件並透過發出新事件來觸發後續步驟。沒有中央協調者;流程源自於各個服務的互動。
主要優點:
- 解耦服務:每個服務獨立運作。
- 可擴充性:事件驅動系統有效處理高負載。
- 彈性:新增服務不需要更改工作流程邏輯。
挑戰:
- 偵錯複雜性:跨多個服務追蹤事件可能很棘手。 (我會寫一篇專門討論這個主題的文章,敬請期待!)
- 基礎架構設定:服務需要強大的訊息代理程式(例如 RabbitMQ)來連接所有點。
- 事件風暴:設計不當的工作流程可能會導致系統因事件而不堪負荷。
實例:醫療保健工作流程
讓我們回顧一下第一篇文章中的醫療保健工作流程:
- 病患服務:驗證病患詳細資料和保險範圍。
- 調度服務: 安排程序。
- 庫存服務:儲備醫療用品。
- 計費服務:處理計費。
每項服務將:
- 使用 RabbitMQ 監聽特定事件。
- 發出新事件來觸發後續步驟。
使用 Docker 設定 RabbitMQ
我們將使用 RabbitMQ 作為事件佇列。使用 Docker 在本地運行它:
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
存取RabbitMQ管理介面:http://localhost:15672(使用者名稱:guest,密碼:guest)。
交換、佇列和綁定設置
我們需要設定 RabbitMQ 來適應我們的事件。以下是用於設定 RabbitMQ 基礎架構的 init.go 檔案範例:
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
完整程式碼在這裡!
注意:在生產環境中,您可能想要使用 GitOps 方法(例如,使用 Terraform)來管理此設置,或讓每個服務動態處理自己的佇列。
實施:服務文件
每個服務都有自己的 main.go。我們還將包括優雅地處理失敗的補償措施。
1.病人服務
此服務驗證患者詳細資料並發出 PatientVerified 事件。如果發生下游故障,它也會透過通知患者進行補償。
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
2.調度服務
此服務偵聽 PatientVerified 並發出 procedureScheduled。如果發生下游故障,它會透過取消程序來進行補償。
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
附加服務
包括庫存服務和計費服務實現,遵循與上面相同的結構。每個服務都會監聽前一個事件並發出下一個事件,確保針對失敗的補償邏輯到位。
完整程式碼在這裡!
運行工作流程
啟動 RabbitMQ:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
執行每個服務:
打開單獨的終端並運行:
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
觀察輸出:
每個服務依序處理事件,記錄工作流程進度。
發生了什麼事?
讓我們來分解一下吧!
首先,就本文而言,我們不會實作SuppliesReserveFailed 和ProcedureScheduleFailed,l 以避免不必要的複雜性。
我們正在實施以下活動
步驟(或交易):
- T1:(初始化):患者已驗證
- T2:已排定程序
- T3:補給預留
- T4:計費成功
補償:
- C4:計費失敗
- C3:保留供應品已釋放
- C2:程式安排已取消
- C1:NotifyFailureToUser(未實作)
按照這個實現圖
該圖代表了記錄編排的常見方法。然而,我發現它有點難以理解並且有點令人沮喪,特別是對於那些不熟悉實現或模式的人。
讓我們來分解一下吧!
上圖更加冗長,它分解了每個步驟,使您更容易理解發生了什麼。
簡而言之:
- 患者服務已成功驗證患者詳細資訊
- 病患服務發出PatientVerified
- 調度程序服務消耗PatientVerified
- 預約服務預約成功
- 調度程序服務發出ProcedureScheduled
- 庫存服務消耗ProcedureScheduled
- 庫存服務成功儲備貨源
- 庫存服務發出供應品保留
- 計費服務消耗SuppliesReserved
- 計費服務無法向客戶收費並開始補償
- 計費服務發出 BillingFailed
- 庫存服務消耗 BillingFailed
- 庫存服務釋放第 7 步中保留的物資
- 庫存服務發出ReservedSuppliesReleased
- 調度程序服務消耗ReservedSuppliesReleased
- 調度程序服務刪除步驟 4 中安排的約會
- 調度程序服務發出ProcedureScheduleCancelled
- 病患服務消耗ProcedureScheduleCancelled
- 病患服務人員通知客戶錯誤
請注意,為了簡潔起見,我們沒有實現步驟 1、4 和 7 的失敗;然而,方法是相同的。每一次失敗都會觸發前面步驟的回滾。
可觀察性
可觀察性對於除錯和監控分散式系統至關重要。實作日誌、指標和追蹤可確保開發人員能夠了解系統行為並有效診斷問題。
記錄
- 使用結構化日誌記錄(例如 JSON 格式)來擷取事件和元資料。
- 在日誌中包含相關 ID 以追蹤跨服務的工作流程。
指標
- 監控佇列大小和事件處理時間。
- 使用 Prometheus 等工具來收集和視覺化指標。
追蹤
- 實作分散式追蹤(例如,使用 OpenTelemetry)來追蹤跨服務的事件。
- 使用相關資料(例如事件名稱、時間戳記)對範圍進行註釋,以獲得更好的見解。
我們將在本系列後面深入探討編舞中的可觀察性,敬請期待!
重點
- 分散控制:編排可自主協作。
- 事件驅動的簡單性: RabbitMQ 簡化了訊息交換。
- 可擴充架構:無縫新增服務。
-
編舞一開始可能會讓人不知所措,但一如既往:練習會讓你
完美更好!
請繼續關注下一篇文章,我們將探索編排!
在此處查看本系列的完整儲存庫。評論裡一起討論吧!
以上是微服務中的事務:SAGA 模式與編排的一部分的詳細內容。更多資訊請關注PHP中文網其他相關文章!

C 更適合需要直接控制硬件資源和高性能優化的場景,而Golang更適合需要快速開發和高並發處理的場景。 1.C 的優勢在於其接近硬件的特性和高度的優化能力,適合遊戲開發等高性能需求。 2.Golang的優勢在於其簡潔的語法和天然的並發支持,適合高並發服務開發。

Golang在实际应用中表现出色,以简洁、高效和并发性著称。1)通过Goroutines和Channels实现并发编程,2)利用接口和多态编写灵活代码,3)使用net/http包简化网络编程,4)构建高效并发爬虫,5)通过工具和最佳实践进行调试和优化。

Go語言的核心特性包括垃圾回收、靜態鏈接和並發支持。 1.Go語言的並發模型通過goroutine和channel實現高效並發編程。 2.接口和多態性通過實現接口方法,使得不同類型可以統一處理。 3.基本用法展示了函數定義和調用的高效性。 4.高級用法中,切片提供了動態調整大小的強大功能。 5.常見錯誤如競態條件可以通過gotest-race檢測並解決。 6.性能優化通過sync.Pool重用對象,減少垃圾回收壓力。

Go語言在構建高效且可擴展的系統中表現出色,其優勢包括:1.高性能:編譯成機器碼,運行速度快;2.並發編程:通過goroutines和channels簡化多任務處理;3.簡潔性:語法簡潔,降低學習和維護成本;4.跨平台:支持跨平台編譯,方便部署。

關於SQL查詢結果排序的疑惑學習SQL的過程中,常常會遇到一些令人困惑的問題。最近,筆者在閱讀《MICK-SQL基礎�...

golang ...

Go語言中如何對比並處理三個結構體在Go語言編程中,有時需要對比兩個結構體的差異,並將這些差異應用到第�...


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

MinGW - Minimalist GNU for Windows
這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

MantisBT
Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用