ホームページ >バックエンド開発 >Golang >注文処理システムの実装: パート 高度な一時的ワークフロー

注文処理システムの実装: パート 高度な一時的ワークフロー

PHPz
PHPzオリジナル
2024-09-05 22:38:321100ブラウズ

Implementing an Order Processing System: Part  Advanced Temporal Workflows

1. はじめにと目標

洗練された注文処理システムの実装に関するシリーズへようこそ!前回の投稿では、基本的な CRUD API のセットアップ、Postgres データベースとの統合、および単純な Temporal ワークフローの実装により、プロジェクトの基礎を築きました。現在、私たちは Temporal ワークフローの世界をさらに深く掘り下げて、堅牢でスケーラブルな注文処理システムを作成しています。

前回の投稿の要約

パート 1 では、次のことを行います:

  • プロジェクト構造を設定する
  • Golang と Gin を使用して基本的な CRUD API を実装しました
  • Postgres データベースと統合
  • シンプルな時間ワークフローを作成しました
  • アプリケーションを Docker 化しました

この投稿の目標

この投稿では、Temporal の使用を大幅に拡張し、高度な概念を検討し、複雑なワークフローを実装します。この記事を読み終えるまでに、次のことができるようになります:

  1. 複数ステップの注文処理ワークフローを設計および実装する
  2. 長時間実行プロセスを効果的に処理する
  3. 堅牢なエラー処理と再試行メカニズムを実装します
  4. 本番環境で安全に更新するためのバージョンワークフロー
  5. 分散トランザクションのサガ パターンを実装する
  6. Temporal ワークフローの監視と可観測性を設定する

さあ、飛び込みましょう!

2. 理論的背景と概念

コーディングを開始する前に、高度な実装にとって重要となる重要な時間的概念をいくつか確認してみましょう。

一時的なワークフローとアクティビティ

Temporal では、ワークフローは、長時間実行されるビジネス ロジックを調整する永続的な機能です。ワークフローはフォールトトレラントであり、プロセスやマシンの障害にも耐えることができます。これらは、アプリケーションの状態遷移の信頼できる調整メカニズムと考えることができます。

一方、アクティビティはワークフローの構成要素です。これらは、API 呼び出しの実行、データベースへの書き込み、電子メールの送信など、明確に定義された単一のアクションまたはタスクを表します。アクティビティは、それを呼び出すワークフローとは独立して再試行できます。

ワークフローの実行、履歴、および状態の管理

ワークフローが実行されると、Temporal はその存続期間中に発生したすべてのイベントの履歴を保持します。この履歴は、ワークフローの状態に関する真実の情報源です。ワークフロー ワーカーが失敗して再起動した場合、この履歴を再生することでワークフローの状態を再構築できます。

このイベント ソーシング アプローチにより、Temporal は強力な一貫性を保証し、ワークフローのバージョン管理や新規として継続などの機能を有効にすることができます。

長時間実行プロセスの処理

Temporal は、数分から数日、場合によっては数か月にわたる長期間実行できるプロセスを処理するように設計されています。長時間実行されるアクティビティのハートビートや、大きな履歴を生成するワークフローの新規として継続などのメカニズムを提供します。

ワークフローのバージョン管理

システムが進化するにつれて、ワークフロー定義の更新が必要になる場合があります。 Temporal は、実行中のインスタンスに影響を与えることなく、ワークフローに重大な変更を加えることができるバージョン管理機能を提供します。

分散トランザクションの Saga パターン

Saga パターンは、分散トランザクション シナリオでマイクロサービス間のデータの一貫性を管理する方法です。これは、分散 ACID トランザクションを使用せずに複数のサービス間で一貫性を維持する必要がある場合に特に役立ちます。 Temporal は、物語を実装するための優れたフレームワークを提供します。

これらの概念を説明したので、高度な注文処理ワークフローの実装を開始しましょう。

3. 複雑な注文処理ワークフローの実装

注文の検証、支払い処理、在庫管理、出荷手配を含む、複数のステップからなる注文処理ワークフローを設計してみましょう。これらの各ステップを、ワークフローによって調整された個別のアクティビティとして実装します。

まず、アクティビティを定義しましょう:

// internal/workflow/activities.go

package workflow

import (
    "context"
    "errors"

    "go.temporal.io/sdk/activity"
    "github.com/yourusername/order-processing-system/internal/db"
)

type OrderActivities struct {
    queries *db.Queries
}

func NewOrderActivities(queries *db.Queries) *OrderActivities {
    return &OrderActivities{queries: queries}
}

func (a *OrderActivities) ValidateOrder(ctx context.Context, order db.Order) error {
    // Implement order validation logic
    if order.TotalAmount <= 0 {
        return errors.New("invalid order amount")
    }
    // Add more validation as needed
    return nil
}

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    // Implement payment processing logic
    // This could involve calling a payment gateway API
    activity.GetLogger(ctx).Info("Processing payment", "orderId", order.ID, "amount", order.TotalAmount)
    // Simulate payment processing
    // In a real scenario, you'd integrate with a payment gateway here
    return nil
}

func (a *OrderActivities) UpdateInventory(ctx context.Context, order db.Order) error {
    // Implement inventory update logic
    // This could involve updating stock levels in the database
    activity.GetLogger(ctx).Info("Updating inventory", "orderId", order.ID)
    // Simulate inventory update
    // In a real scenario, you'd update your inventory management system here
    return nil
}

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    // Implement shipping arrangement logic
    // This could involve calling a shipping provider's API
    activity.GetLogger(ctx).Info("Arranging shipping", "orderId", order.ID)
    // Simulate shipping arrangement
    // In a real scenario, you'd integrate with a shipping provider here
    return nil
}

それでは、複雑な注文処理ワークフローを実装してみましょう:

// internal/workflow/order_workflow.go

package workflow

import (
    "time"

    "go.temporal.io/sdk/workflow"
    "github.com/yourusername/order-processing-system/internal/db"
)

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Activity options
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval: time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval: time.Minute,
            MaximumAttempts: 5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Step 1: Validate Order
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Order validation failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Payment processing failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Inventory update failed", "OrderID", order.ID, "Error", err)
        // In case of inventory update failure, we might need to refund the payment
        // This is where the saga pattern becomes useful, which we'll cover later
        return err
    }

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Shipping arrangement failed", "OrderID", order.ID, "Error", err)
        // If shipping fails, we might need to revert inventory and refund payment
        return err
    }

    logger.Info("OrderWorkflow completed successfully", "OrderID", order.ID)
    return nil
}

このワークフローは複数のアクティビティを調整し、それぞれが注文処理のステップを表します。 workflow.ExecuteActivity を使用して各アクティビティを実行し、必要に応じて注文データを渡していることに注目してください。

再試行ポリシーを備えたアクティビティ オプションも設定しました。これは、アクティビティが (一時的なネットワークの問題などにより) 失敗した場合、Temporal は指定されたポリシーに基づいて自動的に再試行することを意味します。

次のセクションでは、このワークフロー構造内で長時間実行プロセスを処理する方法を検討します。

4. Handling Long-Running Processes with Temporal

In real-world scenarios, some of our activities might take a long time to complete. For example, payment processing might need to wait for bank confirmation, or shipping arrangement might depend on external logistics systems. Temporal provides several mechanisms to handle such long-running processes effectively.

Heartbeats for Long-Running Activities

For activities that might run for extended periods, it’s crucial to implement heartbeats. Heartbeats allow an activity to report its progress and let Temporal know that it’s still alive and working. If an activity fails to heartbeat within the expected interval, Temporal can mark it as failed and potentially retry it.

Let’s modify our ArrangeShipping activity to include heartbeats:

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Arranging shipping", "orderId", order.ID)

    // Simulate a long-running process
    for i := 0; i < 10; i++ {
        // Simulate work
        time.Sleep(time.Second)

        // Record heartbeat
        activity.RecordHeartbeat(ctx, i)

        // Check if we need to cancel
        if activity.GetInfo(ctx).Attempt > 1 {
            logger.Info("Cancelling shipping arrangement due to retry", "orderId", order.ID)
            return nil
        }
    }

    logger.Info("Shipping arranged", "orderId", order.ID)
    return nil
}

In this example, we’re simulating a long-running process with a loop. We record a heartbeat in each iteration, allowing Temporal to track the activity’s progress.

Using Continue-As-New for Very Long-Running Workflows

For workflows that run for very long periods or accumulate a large history, Temporal provides the “continue-as-new” feature. This allows you to complete the current workflow execution and immediately start a new execution with the same workflow ID, carrying over any necessary state.

Here’s an example of how we might use continue-as-new in a long-running order tracking workflow:

func LongRunningOrderTrackingWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)

    // Set up a timer for how long we want this workflow execution to run
    timerFired := workflow.NewTimer(ctx, 24*time.Hour)

    // Set up a selector to wait for either the timer to fire or the order to be delivered
    selector := workflow.NewSelector(ctx)

    var orderDelivered bool
    selector.AddFuture(timerFired, func(f workflow.Future) {
        // Timer fired, we'll continue-as-new
        logger.Info("24 hours passed, continuing as new", "orderID", orderID)
        workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
    })

    selector.AddReceive(workflow.GetSignalChannel(ctx, "orderDelivered"), func(c workflow.ReceiveChannel, more bool) {
        c.Receive(ctx, &orderDelivered)
        logger.Info("Order delivered signal received", "orderID", orderID)
    })

    selector.Select(ctx)

    if orderDelivered {
        logger.Info("Order tracking completed, order delivered", "orderID", orderID)
        return nil
    }

    // If we reach here, it means we're continuing as new
    return workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
}

In this example, we set up a workflow that tracks an order for delivery. It runs for 24 hours before using continue-as-new to start a fresh execution. This prevents the workflow history from growing too large over extended periods.

By leveraging these techniques, we can handle long-running processes effectively in our order processing system, ensuring reliability and scalability even for operations that take extended periods to complete.

In the next section, we’ll dive into implementing robust retry logic and error handling in our workflows and activities.

5. Implementing Retry Logic and Error Handling

Robust error handling and retry mechanisms are crucial for building resilient systems, especially in distributed environments. Temporal provides powerful built-in retry mechanisms, but it’s important to understand how to use them effectively and when to implement custom retry logic.

Configuring Retry Policies for Activities

Temporal allows you to configure retry policies at both the workflow and activity level. Let’s update our workflow to include a more sophisticated retry policy:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define a retry policy
    retryPolicy := &temporal.RetryPolicy{
        InitialInterval: time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval: time.Minute,
        MaximumAttempts: 5,
        NonRetryableErrorTypes: []string{"InvalidOrderError"},
    }

    // Activity options with retry policy
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: retryPolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Execute activities with retry policy
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        return handleOrderError(ctx, "ValidateOrder", err, order)
    }

    // ... (other activities)

    return nil
}

In this example, we’ve defined a retry policy that starts with a 1-second interval, doubles the interval with each retry (up to a maximum of 1 minute), and allows up to 5 attempts. We’ve also specified that errors of type “InvalidOrderError” should not be retried.

Implementing Custom Retry Logic

While Temporal’s built-in retry mechanisms are powerful, sometimes you need custom retry logic. Here’s an example of implementing custom retry logic for a payment processing activity:

func (a *OrderActivities) ProcessPaymentWithCustomRetry(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    var err error
    for attempt := 1; attempt <= 3; attempt++ {
        err = a.processPayment(ctx, order)
        if err == nil {
            return nil
        }

        if _, ok := err.(*PaymentDeclinedError); ok {
            // Payment was declined, no point in retrying
            return err
        }

        logger.Info("Payment processing failed, retrying", "attempt", attempt, "error", err)
        time.Sleep(time.Duration(attempt) * time.Second)
    }
    return err
}

func (a *OrderActivities) processPayment(ctx context.Context, order db.Order) error {
    // Actual payment processing logic here
    // ...
}

In this example, we implement a custom retry mechanism that attempts the payment processing up to 3 times, with an increasing delay between attempts. It also handles a specific error type (PaymentDeclinedError) differently, not retrying in that case.

Handling and Propagating Errors

Proper error handling is crucial for maintaining the integrity of our workflow. Let’s implement a helper function to handle errors in our workflow:

func handleOrderError(ctx workflow.Context, activityName string, err error, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Activity failed", "activity", activityName, "orderID", order.ID, "error", err)

    // Depending on the activity and error type, we might want to compensate
    switch activityName {
    case "ProcessPayment":
        // If payment processing failed, we might need to cancel the order
        _ = workflow.ExecuteActivity(ctx, CancelOrder, order).Get(ctx, nil)
    case "UpdateInventory":
        // If inventory update failed after payment, we might need to refund
        _ = workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil)
    }

    // Create a customer-facing error message
    return workflow.NewCustomError("OrderProcessingFailed", "Failed to process order due to: "+err.Error())
}

This helper function logs the error, performs any necessary compensating actions, and returns a custom error that can be safely returned to the customer.

6. Versioning Workflows for Safe Updates

As your system evolves, you’ll need to update your workflow definitions. Temporal provides versioning capabilities that allow you to make changes to workflows without affecting running instances.

Implementing Versioned Workflows

Here’s an example of how to implement versioning in our order processing workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Use GetVersion to handle workflow versioning
    v := workflow.GetVersion(ctx, "OrderWorkflow.PaymentProcessing", workflow.DefaultVersion, 1)

    if v == workflow.DefaultVersion {
        // Old version: process payment before updating inventory
        err := workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }
    } else {
        // New version: update inventory before processing payment
        err := workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }
    }

    // ... rest of the workflow

    return nil
}

In this example, we’ve used workflow.GetVersion to introduce a change in the order of operations. The new version updates inventory before processing payment, while the old version does the opposite. This allows us to gradually roll out the change without affecting running workflow instances.

Strategies for Updating Workflows in Production

When updating workflows in a production environment, consider the following strategies:

  1. Incremental Changes : Make small, incremental changes rather than large overhauls. This makes it easier to manage versions and roll back if needed.

  2. Compatibility Periods : Maintain compatibility with older versions for a certain period to allow running workflows to complete.

  3. Feature Flags : Use feature flags in conjunction with workflow versions to control the rollout of new features.

  4. Monitoring and Alerting : Set up monitoring and alerting for workflow versions to track the progress of updates and quickly identify any issues.

  5. Rollback Plan : Always have a plan to roll back to the previous version if issues are detected with the new version.

By following these strategies and leveraging Temporal’s versioning capabilities, you can safely evolve your workflows over time without disrupting ongoing operations.

In the next section, we’ll explore how to implement the Saga pattern for managing distributed transactions in our order processing system.

7. Implementing Saga Patterns for Distributed Transactions

The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful in our order processing system where we need to coordinate actions across multiple services (e.g., inventory, payment, shipping) and provide a mechanism for compensating actions if any step fails.

Designing a Saga for Our Order Processing System

Let’s design a saga for our order processing system that includes the following steps:

  1. Reserve Inventory
  2. Process Payment
  3. Update Inventory
  4. Arrange Shipping

If any of these steps fail, we need to execute compensating actions for the steps that have already completed.

Here’s how we can implement this saga using Temporal:

func OrderSaga(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderSaga started", "OrderID", order.ID)

    // Saga compensations
    var compensations []func(context.Context) error

    // Step 1: Reserve Inventory
    err := workflow.ExecuteActivity(ctx, a.ReserveInventory, order).Get(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to reserve inventory: %w", err)
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.ReleaseInventoryReservation(ctx, order)
    })

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to process payment: %w", err))
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.RefundPayment(ctx, order)
    })

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to update inventory: %w", err))
    }
    // No compensation needed for this step, as we've already updated the inventory

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to arrange shipping: %w", err))
    }

    logger.Info("OrderSaga completed successfully", "OrderID", order.ID)
    return nil
}

func compensate(ctx workflow.Context, compensations []func(context.Context) error, err error) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Saga failed, executing compensations", "error", err)

    for i := len(compensations) - 1; i >= 0; i-- {
        compensationErr := workflow.ExecuteActivity(ctx, compensations[i]).Get(ctx, nil)
        if compensationErr != nil {
            logger.Error("Compensation failed", "error", compensationErr)
            // In a real-world scenario, you might want to implement more sophisticated
            // error handling for failed compensations, such as retrying or alerting
        }
    }

    return err
}

In this implementation, we execute each step of the order process as an activity. After each successful step, we add a compensating action to a slice. If any step fails, we call the compensate function, which executes all the compensating actions in reverse order.

This approach ensures that we maintain data consistency across our distributed system, even in the face of failures.

8. Monitoring and Observability for Temporal Workflows

Effective monitoring and observability are crucial for operating Temporal workflows in production. Let’s explore how to implement comprehensive monitoring for our order processing system.

Implementing Custom Metrics

Temporal provides built-in metrics, but we can also implement custom metrics for our specific use cases. Here’s an example of how to add custom metrics to our workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define metric
    orderProcessingTime := workflow.NewTimer(ctx, 0)
    defer func() {
        duration := orderProcessingTime.ElapsedTime()
        workflow.GetMetricsHandler(ctx).Timer("order_processing_time").Record(duration)
    }()

    // ... rest of the workflow implementation

    return nil
}

In this example, we’re recording the total time taken to process an order.

Integrating with Prometheus

To integrate with Prometheus, we need to expose our metrics. Here’s how we can set up a Prometheus endpoint in our main application:

package main

import (
    "net/http"

    "github.com/prometheus/client_golang/prometheus/promhttp"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    // ... Temporal client setup

    // Create a worker
    w := worker.New(c, "order-processing-task-queue", worker.Options{})

    // Register workflows and activities
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterActivity(a.ValidateOrder)
    // ... register other activities

    // Start the worker
    go func() {
        err := w.Run(worker.InterruptCh())
        if err != nil {
            logger.Fatal("Unable to start worker", err)
        }
    }()

    // Expose Prometheus metrics
    http.Handle("/metrics", promhttp.Handler())
    go func() {
        err := http.ListenAndServe(":2112", nil)
        if err != nil {
            logger.Fatal("Unable to start metrics server", err)
        }
    }()

    // ... rest of your application
}

This sets up a /metrics endpoint that Prometheus can scrape to collect our custom metrics along with the built-in Temporal metrics.

Implementing Structured Logging

Structured logging can greatly improve the observability of our system. Let’s update our workflow to use structured logging:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started",
        "OrderID", order.ID,
        "CustomerID", order.CustomerID,
        "TotalAmount", order.TotalAmount,
    )

    // ... workflow implementation

    logger.Info("OrderWorkflow completed",
        "OrderID", order.ID,
        "Duration", workflow.Now(ctx).Sub(workflow.GetInfo(ctx).WorkflowStartTime),
    )

    return nil
}

This approach makes it easier to search and analyze logs, especially when aggregating logs from multiple services.

Setting Up Distributed Tracing

Distributed tracing can provide valuable insights into the flow of requests through our system. While Temporal doesn’t natively support distributed tracing, we can implement it in our activities:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    _, span := otel.Tracer("order-processing").Start(ctx, "ProcessPayment")
    defer span.End()

    span.SetAttributes(
        attribute.Int64("order.id", order.ID),
        attribute.Float64("order.amount", order.TotalAmount),
    )

    // ... payment processing logic

    return nil
}

By implementing distributed tracing, we can track the entire lifecycle of an order across multiple services and activities.

9. Testing and Validation

Thorough testing is crucial for ensuring the reliability of our Temporal workflows. Let’s explore some strategies for testing our order processing system.

Unit Testing Workflows

Temporal provides a testing framework that allows us to unit test workflows. Here’s an example of how to test our OrderWorkflow:

func TestOrderWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ValidateOrder, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.UpdateInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ArrangeShipping, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderWorkflow, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}

This test sets up a test environment, mocks the activities, and verifies that the workflow completes successfully.

Testing Saga Compensations

It’s important to test that our saga compensations work correctly. Here’s an example test:

func TestOrderSagaCompensation(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(errors.New("payment failed"))
    env.OnActivity(a.ReleaseInventoryReservation, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderSaga, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.Error(t, env.GetWorkflowError())

    // Verify that compensation was called
    env.AssertExpectations(t)
}

This test verifies that when the payment processing fails, the inventory reservation is released as part of the compensation.

10. Cabaran dan Pertimbangan

Semasa kami melaksanakan dan mengendalikan sistem pemprosesan pesanan lanjutan kami, terdapat beberapa cabaran dan pertimbangan yang perlu diingat:

  1. Kerumitan Aliran Kerja : Apabila aliran kerja semakin kompleks, ia boleh menjadi sukar untuk difahami dan dikekalkan. Pemfaktoran semula yang kerap dan dokumentasi yang baik adalah penting.

  2. Menguji Aliran Kerja Berjangka Panjang : Menguji aliran kerja yang mungkin berjalan selama beberapa hari atau minggu boleh menjadi mencabar. Pertimbangkan untuk melaksanakan mekanisme untuk mempercepatkan masa dalam ujian anda.

  3. Mengendalikan Ketergantungan Luaran : Perkhidmatan luaran mungkin gagal atau tidak tersedia. Laksanakan pemutus litar dan mekanisme sandaran untuk mengendalikan senario ini.

  4. Pemantauan dan Makluman : Sediakan pemantauan dan makluman menyeluruh untuk mengenal pasti dan bertindak balas dengan cepat kepada isu dalam aliran kerja anda.

  5. Ketekalan Data : Pastikan pelaksanaan saga anda mengekalkan ketekalan data merentas perkhidmatan, walaupun dalam menghadapi kegagalan.

  6. Penalaan Prestasi : Semasa sistem anda berskala, anda mungkin perlu menala tetapan prestasi Temporal, seperti bilangan aliran kerja dan pekerja aktiviti.

  7. Versi Aliran Kerja : Urus versi aliran kerja dengan berhati-hati untuk memastikan kemas kini lancar tanpa melanggar kejadian yang sedang berjalan.

11. Langkah Seterusnya dan Pratonton Bahagian 3

Dalam siaran ini, kami telah mendalami konsep aliran kerja Temporal lanjutan, melaksanakan logik pemprosesan pesanan yang kompleks, corak saga dan pengendalian ralat yang mantap. Kami juga telah merangkumi strategi pemantauan, kebolehmerhatian dan ujian untuk aliran kerja kami.

Dalam bahagian seterusnya siri kami, kami akan menumpukan pada operasi pangkalan data lanjutan dengan sqlc. Kami akan meliputi:

  1. Melaksanakan pertanyaan dan transaksi pangkalan data yang kompleks
  2. Mengoptimumkan prestasi pangkalan data
  3. Melaksanakan operasi kelompok
  4. Mengendalikan migrasi pangkalan data dalam persekitaran pengeluaran
  5. Melaksanakan sharding pangkalan data untuk kebolehskalaan
  6. Memastikan ketekalan data dalam sistem teragih

Nantikan semasa kami terus membina sistem pemprosesan pesanan kami yang canggih!


Perlukan Bantuan?

Adakah anda menghadapi masalah yang mencabar, atau memerlukan perspektif luaran tentang idea atau projek baharu? Saya boleh tolong! Sama ada anda ingin membina konsep bukti teknologi sebelum membuat pelaburan yang lebih besar, atau anda memerlukan panduan tentang isu yang sukar, saya sedia membantu.

Perkhidmatan yang Ditawarkan:

  • Penyelesaian Masalah: Menangani isu yang rumit dengan penyelesaian yang inovatif.
  • Perundingan: Memberikan nasihat pakar dan pandangan baharu tentang projek anda.
  • Bukti Konsep: Membangunkan model awal untuk menguji dan mengesahkan idea anda.

Jika anda berminat untuk bekerja dengan saya, sila hubungi melalui e-mel di hungaikevin@gmail.com.

Mari jadikan cabaran anda sebagai peluang!

以上が注文処理システムの実装: パート 高度な一時的ワークフローの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。