欢迎回到我们关于实施复杂订单处理系统的系列!在上一篇文章中,我们为我们的项目奠定了基础,设置了一个基本的 CRUD API,与 Postgres 数据库集成,并实现了一个简单的时态工作流程。今天,我们将深入研究临时工作流程的世界,以创建一个强大的、可扩展的订单处理系统。
在第 1 部分中,我们:
在这篇文章中,我们将显着扩展 Temporal 的使用,探索高级概念并实现复杂的工作流程。读完本文后,您将能够:
让我们开始吧!
在开始编码之前,让我们回顾一些对于我们的高级实现至关重要的关键时间概念。
在 Temporal 中,工作流是一种持久功能,可以编排长期运行的业务逻辑。工作流程具有容错能力,可以承受流程和机器故障。它们可以被认为是应用程序状态转换的可靠协调机制。
另一方面,活动是工作流程的构建块。它们代表单个、定义明确的操作或任务,例如进行 API 调用、写入数据库或发送电子邮件。活动可以独立于调用它们的工作流程而重试。
执行工作流时,Temporal 会维护其生命周期内发生的所有事件的历史记录。该历史记录是工作流状态的真实来源。如果工作流工作线程失败并重新启动,它可以通过重播此历史记录来重建工作流的状态。
这种事件溯源方法使 Temporal 能够提供强大的一致性保证,并启用工作流版本控制和继续更新等功能。
Temporal 旨在处理可以长时间运行的进程 - 从几分钟到几天甚至几个月。它为长时间运行的活动提供了心跳等机制,并为生成大量历史记录的工作流程提供了持续更新的机制。
随着系统的发展,您可能需要更新工作流程定义。 Temporal 提供版本控制功能,允许您对工作流程进行不间断更改,而不影响正在运行的实例。
Saga 模式是一种在分布式事务场景中管理跨微服务数据一致性的方法。当您需要在多个服务之间保持一致性而不使用分布式 ACID 事务时,它特别有用。 Temporal 为实现 sagas 提供了一个优秀的框架。
现在我们已经介绍了这些概念,让我们开始实施我们的高级订单处理工作流程。
让我们设计一个多步骤订单处理工作流程,包括订单验证、付款处理、库存管理和运输安排。我们将把每个步骤作为由工作流程协调的单独活动来实现。
首先,让我们定义我们的活动:
// internal/workflow/activities.go package workflow import ( "context" "errors" "go.temporal.io/sdk/activity" "github.com/yourusername/order-processing-system/internal/db" ) type OrderActivities struct { queries *db.Queries } func NewOrderActivities(queries *db.Queries) *OrderActivities { return &OrderActivities{queries: queries} } func (a *OrderActivities) ValidateOrder(ctx context.Context, order db.Order) error { // Implement order validation logic if order.TotalAmount <= 0 { return errors.New("invalid order amount") } // Add more validation as needed return nil } func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error { // Implement payment processing logic // This could involve calling a payment gateway API activity.GetLogger(ctx).Info("Processing payment", "orderId", order.ID, "amount", order.TotalAmount) // Simulate payment processing // In a real scenario, you'd integrate with a payment gateway here return nil } func (a *OrderActivities) UpdateInventory(ctx context.Context, order db.Order) error { // Implement inventory update logic // This could involve updating stock levels in the database activity.GetLogger(ctx).Info("Updating inventory", "orderId", order.ID) // Simulate inventory update // In a real scenario, you'd update your inventory management system here return nil } func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error { // Implement shipping arrangement logic // This could involve calling a shipping provider's API activity.GetLogger(ctx).Info("Arranging shipping", "orderId", order.ID) // Simulate shipping arrangement // In a real scenario, you'd integrate with a shipping provider here return nil }
现在,让我们实现复杂的订单处理工作流程:
// internal/workflow/order_workflow.go package workflow import ( "time" "go.temporal.io/sdk/workflow" "github.com/yourusername/order-processing-system/internal/db" ) func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Activity options activityOptions := workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, MaximumAttempts: 5, }, } ctx = workflow.WithActivityOptions(ctx, activityOptions) // Step 1: Validate Order err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil) if err != nil { logger.Error("Order validation failed", "OrderID", order.ID, "Error", err) return err } // Step 2: Process Payment err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { logger.Error("Payment processing failed", "OrderID", order.ID, "Error", err) return err } // Step 3: Update Inventory err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { logger.Error("Inventory update failed", "OrderID", order.ID, "Error", err) // In case of inventory update failure, we might need to refund the payment // This is where the saga pattern becomes useful, which we'll cover later return err } // Step 4: Arrange Shipping err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil) if err != nil { logger.Error("Shipping arrangement failed", "OrderID", order.ID, "Error", err) // If shipping fails, we might need to revert inventory and refund payment return err } logger.Info("OrderWorkflow completed successfully", "OrderID", order.ID) return nil }
此工作流程协调多个活动,每个活动代表我们订单处理中的一个步骤。请注意我们如何使用工作流.ExecuteActivity 来运行每个活动,并根据需要传递订单数据。
我们还设置了带有重试政策的活动选项。这意味着如果某个活动失败(例如,由于临时网络问题),Temporal 将根据我们指定的策略自动重试。
在下一节中,我们将探讨如何在此工作流结构中处理长时间运行的流程。
In real-world scenarios, some of our activities might take a long time to complete. For example, payment processing might need to wait for bank confirmation, or shipping arrangement might depend on external logistics systems. Temporal provides several mechanisms to handle such long-running processes effectively.
For activities that might run for extended periods, it’s crucial to implement heartbeats. Heartbeats allow an activity to report its progress and let Temporal know that it’s still alive and working. If an activity fails to heartbeat within the expected interval, Temporal can mark it as failed and potentially retry it.
Let’s modify our ArrangeShipping activity to include heartbeats:
func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error { logger := activity.GetLogger(ctx) logger.Info("Arranging shipping", "orderId", order.ID) // Simulate a long-running process for i := 0; i < 10; i++ { // Simulate work time.Sleep(time.Second) // Record heartbeat activity.RecordHeartbeat(ctx, i) // Check if we need to cancel if activity.GetInfo(ctx).Attempt > 1 { logger.Info("Cancelling shipping arrangement due to retry", "orderId", order.ID) return nil } } logger.Info("Shipping arranged", "orderId", order.ID) return nil }
In this example, we’re simulating a long-running process with a loop. We record a heartbeat in each iteration, allowing Temporal to track the activity’s progress.
For workflows that run for very long periods or accumulate a large history, Temporal provides the “continue-as-new” feature. This allows you to complete the current workflow execution and immediately start a new execution with the same workflow ID, carrying over any necessary state.
Here’s an example of how we might use continue-as-new in a long-running order tracking workflow:
func LongRunningOrderTrackingWorkflow(ctx workflow.Context, orderID string) error { logger := workflow.GetLogger(ctx) // Set up a timer for how long we want this workflow execution to run timerFired := workflow.NewTimer(ctx, 24*time.Hour) // Set up a selector to wait for either the timer to fire or the order to be delivered selector := workflow.NewSelector(ctx) var orderDelivered bool selector.AddFuture(timerFired, func(f workflow.Future) { // Timer fired, we'll continue-as-new logger.Info("24 hours passed, continuing as new", "orderID", orderID) workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID) }) selector.AddReceive(workflow.GetSignalChannel(ctx, "orderDelivered"), func(c workflow.ReceiveChannel, more bool) { c.Receive(ctx, &orderDelivered) logger.Info("Order delivered signal received", "orderID", orderID) }) selector.Select(ctx) if orderDelivered { logger.Info("Order tracking completed, order delivered", "orderID", orderID) return nil } // If we reach here, it means we're continuing as new return workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID) }
In this example, we set up a workflow that tracks an order for delivery. It runs for 24 hours before using continue-as-new to start a fresh execution. This prevents the workflow history from growing too large over extended periods.
By leveraging these techniques, we can handle long-running processes effectively in our order processing system, ensuring reliability and scalability even for operations that take extended periods to complete.
In the next section, we’ll dive into implementing robust retry logic and error handling in our workflows and activities.
Robust error handling and retry mechanisms are crucial for building resilient systems, especially in distributed environments. Temporal provides powerful built-in retry mechanisms, but it’s important to understand how to use them effectively and when to implement custom retry logic.
Temporal allows you to configure retry policies at both the workflow and activity level. Let’s update our workflow to include a more sophisticated retry policy:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Define a retry policy retryPolicy := &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, MaximumAttempts: 5, NonRetryableErrorTypes: []string{"InvalidOrderError"}, } // Activity options with retry policy activityOptions := workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, RetryPolicy: retryPolicy, } ctx = workflow.WithActivityOptions(ctx, activityOptions) // Execute activities with retry policy err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "ValidateOrder", err, order) } // ... (other activities) return nil }
In this example, we’ve defined a retry policy that starts with a 1-second interval, doubles the interval with each retry (up to a maximum of 1 minute), and allows up to 5 attempts. We’ve also specified that errors of type “InvalidOrderError” should not be retried.
While Temporal’s built-in retry mechanisms are powerful, sometimes you need custom retry logic. Here’s an example of implementing custom retry logic for a payment processing activity:
func (a *OrderActivities) ProcessPaymentWithCustomRetry(ctx context.Context, order db.Order) error { logger := activity.GetLogger(ctx) var err error for attempt := 1; attempt <= 3; attempt++ { err = a.processPayment(ctx, order) if err == nil { return nil } if _, ok := err.(*PaymentDeclinedError); ok { // Payment was declined, no point in retrying return err } logger.Info("Payment processing failed, retrying", "attempt", attempt, "error", err) time.Sleep(time.Duration(attempt) * time.Second) } return err } func (a *OrderActivities) processPayment(ctx context.Context, order db.Order) error { // Actual payment processing logic here // ... }
In this example, we implement a custom retry mechanism that attempts the payment processing up to 3 times, with an increasing delay between attempts. It also handles a specific error type (PaymentDeclinedError) differently, not retrying in that case.
Proper error handling is crucial for maintaining the integrity of our workflow. Let’s implement a helper function to handle errors in our workflow:
func handleOrderError(ctx workflow.Context, activityName string, err error, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Error("Activity failed", "activity", activityName, "orderID", order.ID, "error", err) // Depending on the activity and error type, we might want to compensate switch activityName { case "ProcessPayment": // If payment processing failed, we might need to cancel the order _ = workflow.ExecuteActivity(ctx, CancelOrder, order).Get(ctx, nil) case "UpdateInventory": // If inventory update failed after payment, we might need to refund _ = workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil) } // Create a customer-facing error message return workflow.NewCustomError("OrderProcessingFailed", "Failed to process order due to: "+err.Error()) }
This helper function logs the error, performs any necessary compensating actions, and returns a custom error that can be safely returned to the customer.
As your system evolves, you’ll need to update your workflow definitions. Temporal provides versioning capabilities that allow you to make changes to workflows without affecting running instances.
Here’s an example of how to implement versioning in our order processing workflow:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Use GetVersion to handle workflow versioning v := workflow.GetVersion(ctx, "OrderWorkflow.PaymentProcessing", workflow.DefaultVersion, 1) if v == workflow.DefaultVersion { // Old version: process payment before updating inventory err := workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "ProcessPayment", err, order) } err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "UpdateInventory", err, order) } } else { // New version: update inventory before processing payment err := workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "UpdateInventory", err, order) } err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { return handleOrderError(ctx, "ProcessPayment", err, order) } } // ... rest of the workflow return nil }
In this example, we’ve used workflow.GetVersion to introduce a change in the order of operations. The new version updates inventory before processing payment, while the old version does the opposite. This allows us to gradually roll out the change without affecting running workflow instances.
When updating workflows in a production environment, consider the following strategies:
Incremental Changes : Make small, incremental changes rather than large overhauls. This makes it easier to manage versions and roll back if needed.
Compatibility Periods : Maintain compatibility with older versions for a certain period to allow running workflows to complete.
Feature Flags : Use feature flags in conjunction with workflow versions to control the rollout of new features.
Monitoring and Alerting : Set up monitoring and alerting for workflow versions to track the progress of updates and quickly identify any issues.
Rollback Plan : Always have a plan to roll back to the previous version if issues are detected with the new version.
By following these strategies and leveraging Temporal’s versioning capabilities, you can safely evolve your workflows over time without disrupting ongoing operations.
In the next section, we’ll explore how to implement the Saga pattern for managing distributed transactions in our order processing system.
The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful in our order processing system where we need to coordinate actions across multiple services (e.g., inventory, payment, shipping) and provide a mechanism for compensating actions if any step fails.
Let’s design a saga for our order processing system that includes the following steps:
If any of these steps fail, we need to execute compensating actions for the steps that have already completed.
Here’s how we can implement this saga using Temporal:
func OrderSaga(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderSaga started", "OrderID", order.ID) // Saga compensations var compensations []func(context.Context) error // Step 1: Reserve Inventory err := workflow.ExecuteActivity(ctx, a.ReserveInventory, order).Get(ctx, nil) if err != nil { return fmt.Errorf("failed to reserve inventory: %w", err) } compensations = append(compensations, func(ctx context.Context) error { return a.ReleaseInventoryReservation(ctx, order) }) // Step 2: Process Payment err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil) if err != nil { return compensate(ctx, compensations, fmt.Errorf("failed to process payment: %w", err)) } compensations = append(compensations, func(ctx context.Context) error { return a.RefundPayment(ctx, order) }) // Step 3: Update Inventory err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil) if err != nil { return compensate(ctx, compensations, fmt.Errorf("failed to update inventory: %w", err)) } // No compensation needed for this step, as we've already updated the inventory // Step 4: Arrange Shipping err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil) if err != nil { return compensate(ctx, compensations, fmt.Errorf("failed to arrange shipping: %w", err)) } logger.Info("OrderSaga completed successfully", "OrderID", order.ID) return nil } func compensate(ctx workflow.Context, compensations []func(context.Context) error, err error) error { logger := workflow.GetLogger(ctx) logger.Error("Saga failed, executing compensations", "error", err) for i := len(compensations) - 1; i >= 0; i-- { compensationErr := workflow.ExecuteActivity(ctx, compensations[i]).Get(ctx, nil) if compensationErr != nil { logger.Error("Compensation failed", "error", compensationErr) // In a real-world scenario, you might want to implement more sophisticated // error handling for failed compensations, such as retrying or alerting } } return err }
In this implementation, we execute each step of the order process as an activity. After each successful step, we add a compensating action to a slice. If any step fails, we call the compensate function, which executes all the compensating actions in reverse order.
This approach ensures that we maintain data consistency across our distributed system, even in the face of failures.
Effective monitoring and observability are crucial for operating Temporal workflows in production. Let’s explore how to implement comprehensive monitoring for our order processing system.
Temporal provides built-in metrics, but we can also implement custom metrics for our specific use cases. Here’s an example of how to add custom metrics to our workflow:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID) // Define metric orderProcessingTime := workflow.NewTimer(ctx, 0) defer func() { duration := orderProcessingTime.ElapsedTime() workflow.GetMetricsHandler(ctx).Timer("order_processing_time").Record(duration) }() // ... rest of the workflow implementation return nil }
In this example, we’re recording the total time taken to process an order.
To integrate with Prometheus, we need to expose our metrics. Here’s how we can set up a Prometheus endpoint in our main application:
package main import ( "net/http" "github.com/prometheus/client_golang/prometheus/promhttp" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) func main() { // ... Temporal client setup // Create a worker w := worker.New(c, "order-processing-task-queue", worker.Options{}) // Register workflows and activities w.RegisterWorkflow(OrderWorkflow) w.RegisterActivity(a.ValidateOrder) // ... register other activities // Start the worker go func() { err := w.Run(worker.InterruptCh()) if err != nil { logger.Fatal("Unable to start worker", err) } }() // Expose Prometheus metrics http.Handle("/metrics", promhttp.Handler()) go func() { err := http.ListenAndServe(":2112", nil) if err != nil { logger.Fatal("Unable to start metrics server", err) } }() // ... rest of your application }
This sets up a /metrics endpoint that Prometheus can scrape to collect our custom metrics along with the built-in Temporal metrics.
Structured logging can greatly improve the observability of our system. Let’s update our workflow to use structured logging:
func OrderWorkflow(ctx workflow.Context, order db.Order) error { logger := workflow.GetLogger(ctx) logger.Info("OrderWorkflow started", "OrderID", order.ID, "CustomerID", order.CustomerID, "TotalAmount", order.TotalAmount, ) // ... workflow implementation logger.Info("OrderWorkflow completed", "OrderID", order.ID, "Duration", workflow.Now(ctx).Sub(workflow.GetInfo(ctx).WorkflowStartTime), ) return nil }
This approach makes it easier to search and analyze logs, especially when aggregating logs from multiple services.
Distributed tracing can provide valuable insights into the flow of requests through our system. While Temporal doesn’t natively support distributed tracing, we can implement it in our activities:
import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" ) func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error { _, span := otel.Tracer("order-processing").Start(ctx, "ProcessPayment") defer span.End() span.SetAttributes( attribute.Int64("order.id", order.ID), attribute.Float64("order.amount", order.TotalAmount), ) // ... payment processing logic return nil }
By implementing distributed tracing, we can track the entire lifecycle of an order across multiple services and activities.
Thorough testing is crucial for ensuring the reliability of our Temporal workflows. Let’s explore some strategies for testing our order processing system.
Temporal provides a testing framework that allows us to unit test workflows. Here’s an example of how to test our OrderWorkflow:
func TestOrderWorkflow(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() // Mock activities env.OnActivity(a.ValidateOrder, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.UpdateInventory, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.ArrangeShipping, mock.Anything, mock.Anything).Return(nil) // Execute workflow env.ExecuteWorkflow(OrderWorkflow, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99}) require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) }
This test sets up a test environment, mocks the activities, and verifies that the workflow completes successfully.
It’s important to test that our saga compensations work correctly. Here’s an example test:
func TestOrderSagaCompensation(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() // Mock activities env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return(nil) env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(errors.New("payment failed")) env.OnActivity(a.ReleaseInventoryReservation, mock.Anything, mock.Anything).Return(nil) // Execute workflow env.ExecuteWorkflow(OrderSaga, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99}) require.True(t, env.IsWorkflowCompleted()) require.Error(t, env.GetWorkflowError()) // Verify that compensation was called env.AssertExpectations(t) }
This test verifies that when the payment processing fails, the inventory reservation is released as part of the compensation.
在我们实施和运营先进的订单处理系统时,需要牢记一些挑战和注意事项:
工作流程复杂性:随着工作流程变得越来越复杂,它们可能会变得难以理解和维护。定期重构和良好的文档至关重要。
测试长时间运行的工作流程:测试可能运行数天或数周的工作流程可能具有挑战性。考虑实施机制来加快测试时间。
处理外部依赖项:外部服务可能会失败或不可用。实施断路器和回退机制来处理这些场景。
监控和警报:设置全面的监控和警报,以快速识别和响应工作流程中的问题。
数据一致性:确保您的 saga 实现即使在出现故障时也能保持跨服务的数据一致性。
性能调整:随着系统的扩展,您可能需要调整 Temporal 的性能设置,例如工作流程和活动工作人员的数量。
工作流程版本控制:仔细管理工作流程版本,以确保顺利更新而不中断正在运行的实例。
在这篇文章中,我们深入研究了高级时态工作流程概念,实现了复杂的订单处理逻辑、saga 模式和强大的错误处理。我们还介绍了工作流程的监控、可观察性和测试策略。
在我们系列的下一部分中,我们将重点介绍使用 sqlc 的高级数据库操作。我们将介绍:
请继续关注我们,我们将继续构建复杂的订单处理系统!
您是否面临着具有挑战性的问题,或者需要外部视角来看待新想法或项目?我可以帮忙!无论您是想在进行更大投资之前建立技术概念验证,还是需要解决困难问题的指导,我都会为您提供帮助。
如果您有兴趣与我合作,请通过电子邮件与我联系:hungaikevin@gmail.com。
让我们将您的挑战转化为机遇!
以上是实现订单处理系统:部分高级时间工作流程的详细内容。更多信息请关注PHP中文网其他相关文章!