在本系列的第一篇文章中,我们介绍了SAGA 模式,并演示了最小的编排如何使用中央编排器管理分布式事务。
让我们面对现实吧!这次,我们将深入探讨编排方法,其中服务通过自主发出和消费事件来协调工作流程。
为了使其切实可行,我们将使用 Go 和 RabbitMQ 实现多服务医疗保健工作流程。每个服务都有自己的 main.go,使其易于扩展、测试和独立运行。
什么是SAGA编排?
编排依赖于去中心化的沟通。每个服务都会侦听事件并通过发出新事件来触发后续步骤。没有中央协调者;该流程源自各个服务的交互。
主要优点:
- 解耦服务:每个服务独立运行。
- 可扩展性:事件驱动系统有效地处理高负载。
- 灵活性:添加新服务不需要更改工作流程逻辑。
挑战:
- 调试复杂性:跨多个服务跟踪事件可能很棘手。 (我会写一篇专门讨论这个话题的文章,敬请期待!)
- 基础设施设置:服务需要强大的消息代理(例如 RabbitMQ)来连接所有点。
- 事件风暴:设计不当的工作流程可能会导致系统因事件而不堪重负。
实例:医疗保健工作流程
让我们回顾一下第一篇文章中的医疗保健工作流程:
- 患者服务:验证患者详细信息和保险范围。
- 调度服务: 安排程序。
- 库存服务:储备医疗用品。
- 计费服务:处理计费。
每项服务将:
- 使用 RabbitMQ 监听特定事件。
- 发出新事件来触发后续步骤。
使用 Docker 设置 RabbitMQ
我们将使用 RabbitMQ 作为事件队列。使用 Docker 在本地运行它:
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
访问RabbitMQ管理界面:http://localhost:15672(用户名:guest,密码:guest)。
交换、队列和绑定设置
我们需要配置 RabbitMQ 来适应我们的事件。以下是用于设置 RabbitMQ 基础设施的 init.go 文件示例:
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
完整代码在这里!
注意:在生产环境中,您可能希望使用 GitOps 方法(例如,使用 Terraform)来管理此设置,或者让每个服务动态处理自己的队列。
实施:服务文件
每个服务都有自己的 main.go。我们还将包括优雅地处理失败的补偿措施。
1.病人服务
该服务验证患者详细信息并发出 PatientVerified 事件。如果发生下游故障,它还会通过通知患者进行补偿。
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
2.调度服务
此服务侦听 PatientVerified 并发出 procedureScheduled。如果发生下游故障,它会通过取消程序来进行补偿。
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
附加服务
包括库存服务和计费服务实现,遵循与上面相同的结构。每个服务都会监听前一个事件并发出下一个事件,确保针对失败的补偿逻辑到位。
完整代码在这里!
运行工作流程
启动 RabbitMQ:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
运行每个服务:
打开单独的终端并运行:
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
观察输出:
每个服务按顺序处理事件,记录工作流程进度。
发生了什么?
让我们来分解一下!
首先,就本文而言,我们不会实现SuppliesReserveFailed 和ProcedureScheduleFailed,l 以避免不必要的复杂性。
我们正在实施以下活动
步骤(或交易):
- T1:(初始化):患者已验证
- T2:已安排程序
- T3:补给预留
- T4:计费成功
补偿:
- C4:计费失败
- C3:保留供应品已释放
- C2:程序安排已取消
- C1:NotifyFailureToUser(未实现)
按照这个实现图
该图代表了记录编排的常见方法。然而,我发现它有点难以理解并且有点令人沮丧,特别是对于那些不熟悉实现或模式的人。
让我们来分解一下!
上图更加冗长,它分解了每个步骤,使您更容易理解发生了什么。
简而言之:
- 患者服务已成功验证患者详细信息
- 患者服务发出PatientVerified
- 调度程序服务消耗PatientVerified
- 预约服务预约成功
- 调度程序服务发出ProcedureScheduled
- 库存服务消耗ProcedureScheduled
- 库存服务成功储备货源
- 库存服务发出供应品保留
- 计费服务消耗SuppliesReserved
- 计费服务无法向客户收费并开始补偿
- 计费服务发出 BillingFailed
- 库存服务消耗 BillingFailed
- 库存服务释放第 7 步中保留的物资
- 库存服务发出ReservedSuppliesReleased
- 调度程序服务消耗ReservedSuppliesReleased
- 调度程序服务删除步骤 4 中安排的约会
- 调度程序服务发出ProcedureScheduleCancelled
- 患者服务消耗ProcedureScheduleCancelled
- 患者服务人员通知客户错误
请注意,为了简洁起见,我们没有实现步骤 1、4 和 7 的失败;然而,方法是相同的。每一次失败都会触发前面步骤的回滚。
可观察性
可观察性对于调试和监控分布式系统至关重要。实施日志、指标和跟踪可确保开发人员能够了解系统行为并有效诊断问题。
记录
- 使用结构化日志记录(例如 JSON 格式)来捕获事件和元数据。
- 在日志中包含相关 ID 以跟踪跨服务的工作流程。
指标
- 监控队列大小和事件处理时间。
- 使用 Prometheus 等工具来收集和可视化指标。
追踪
- 实施分布式跟踪(例如,使用 OpenTelemetry)来跟踪跨服务的事件。
- 使用相关数据(例如事件名称、时间戳)对范围进行注释,以获得更好的见解。
我们将在本系列后面深入探讨编舞中的可观察性,敬请期待!
要点
- 分散控制:编排可实现自主协作。
- 事件驱动的简单性: RabbitMQ 简化了消息交换。
- 可扩展架构:无缝添加新服务。
-
编舞一开始可能会让人不知所措,但一如既往:练习会让你
完美更好!
请继续关注下一篇文章,我们将探索编排!
在此处查看本系列的完整存储库。评论里一起讨论吧!
以上是微服务中的事务:SAGA 模式与编排的一部分的详细内容。更多信息请关注PHP中文网其他相关文章!

C 更适合需要直接控制硬件资源和高性能优化的场景,而Golang更适合需要快速开发和高并发处理的场景。1.C 的优势在于其接近硬件的特性和高度的优化能力,适合游戏开发等高性能需求。2.Golang的优势在于其简洁的语法和天然的并发支持,适合高并发服务开发。

Golang在实际应用中表现出色,以简洁、高效和并发性着称。 1)通过Goroutines和Channels实现并发编程,2)利用接口和多态编写灵活代码,3)使用net/http包简化网络编程,4)构建高效并发爬虫,5)通过工具和最佳实践进行调试和优化。

Go语言的核心特性包括垃圾回收、静态链接和并发支持。1.Go语言的并发模型通过goroutine和channel实现高效并发编程。2.接口和多态性通过实现接口方法,使得不同类型可以统一处理。3.基本用法展示了函数定义和调用的高效性。4.高级用法中,切片提供了动态调整大小的强大功能。5.常见错误如竞态条件可以通过gotest-race检测并解决。6.性能优化通过sync.Pool重用对象,减少垃圾回收压力。

Go语言在构建高效且可扩展的系统中表现出色,其优势包括:1.高性能:编译成机器码,运行速度快;2.并发编程:通过goroutines和channels简化多任务处理;3.简洁性:语法简洁,降低学习和维护成本;4.跨平台:支持跨平台编译,方便部署。

关于SQL查询结果排序的疑惑学习SQL的过程中,常常会遇到一些令人困惑的问题。最近,笔者在阅读《MICK-SQL基础�...

golang ...

Go语言中如何对比并处理三个结构体在Go语言编程中,有时需要对比两个结构体的差异,并将这些差异应用到第�...


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

MinGW - 适用于 Windows 的极简 GNU
这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

螳螂BT
Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

安全考试浏览器
Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

Dreamweaver Mac版
视觉化网页开发工具