>백엔드 개발 >Golang >주문 처리 시스템 구현: 고급 임시 워크플로우 부분

주문 처리 시스템 구현: 고급 임시 워크플로우 부분

PHPz
PHPz원래의
2024-09-05 22:38:321141검색

Implementing an Order Processing System: Part  Advanced Temporal Workflows

1. Einführung und Ziele

Willkommen zurück zu unserer Serie über die Implementierung eines ausgefeilten Auftragsabwicklungssystems! In unserem vorherigen Beitrag haben wir den Grundstein für unser Projekt gelegt, indem wir eine grundlegende CRUD-API eingerichtet, in eine Postgres-Datenbank integriert und einen einfachen temporären Workflow implementiert haben. Heute tauchen wir tiefer in die Welt der zeitlichen Arbeitsabläufe ein, um ein robustes, skalierbares Auftragsabwicklungssystem zu schaffen.

Zusammenfassung des vorherigen Beitrags

In Teil 1:

  • Richten Sie unsere Projektstruktur ein
  • Implementierte eine grundlegende CRUD-API mit Golang und Gin
  • In eine Postgres-Datenbank integriert
  • Einen einfachen zeitlichen Workflow erstellt
  • Dockerisierte unsere Anwendung

Ziele für diesen Beitrag

In diesem Beitrag werden wir unsere Nutzung von Temporal deutlich erweitern, indem wir fortgeschrittene Konzepte erkunden und komplexe Arbeitsabläufe implementieren. Am Ende dieses Artikels werden Sie in der Lage sein:

  1. Entwerfen und implementieren Sie mehrstufige Auftragsabwicklungsworkflows
  2. Behandeln Sie langwierige Prozesse effektiv
  3. Implementieren Sie robuste Fehlerbehandlungs- und Wiederholungsmechanismen
  4. Versionsworkflows für sichere Updates in der Produktion
  5. Implementieren Sie Saga-Muster für verteilte Transaktionen
  6. Richten Sie Überwachung und Beobachtbarkeit für zeitliche Arbeitsabläufe ein

Lass uns eintauchen!

2. Theoretischer Hintergrund und Konzepte

Bevor wir mit dem Codieren beginnen, schauen wir uns einige wichtige zeitliche Konzepte an, die für unsere fortgeschrittene Implementierung von entscheidender Bedeutung sein werden.

Zeitliche Arbeitsabläufe und Aktivitäten

In Temporal ist ein Workflow eine dauerhafte Funktion, die langfristige Geschäftslogik orchestriert. Arbeitsabläufe sind fehlertolerant und können Prozess- und Maschinenausfälle überstehen. Sie können als zuverlässige Koordinierungsmechanismen für die Zustandsübergänge Ihrer Anwendung betrachtet werden.

Aktivitäten hingegen sind die Bausteine ​​eines Workflows. Sie stellen eine einzelne, genau definierte Aktion oder Aufgabe dar, z. B. das Tätigen eines API-Aufrufs, das Schreiben in eine Datenbank oder das Senden einer E-Mail. Aktivitäten können unabhängig vom Workflow, der sie aufruft, wiederholt werden.

Workflow-Ausführung, Verlauf und Statusverwaltung

Wenn ein Workflow ausgeführt wird, verwaltet Temporal einen Verlauf aller Ereignisse, die während seiner Lebensdauer auftreten. Dieser Verlauf ist die Quelle der Wahrheit für den Status des Workflows. Wenn ein Workflow-Worker fehlschlägt und neu startet, kann er den Status des Workflows wiederherstellen, indem er diesen Verlauf erneut abspielt.

Dieser Event-Sourcing-Ansatz ermöglicht es Temporal, starke Konsistenzgarantien zu bieten und Funktionen wie Workflow-Versionierung und Fortsetzung als neu zu ermöglichen.

Umgang mit lang andauernden Prozessen

Temporal ist für die Verarbeitung von Prozessen konzipiert, die über längere Zeiträume laufen können – von Minuten über Tage bis hin zu Monaten. Es bietet Mechanismen wie Heartbeats für lang laufende Aktivitäten und die Fortsetzung als neu für Workflows, die große Historien generieren.

Workflow-Versionierung

Da sich Ihr System weiterentwickelt, müssen Sie möglicherweise Workflow-Definitionen aktualisieren. Temporal bietet Versionierungsfunktionen, die es Ihnen ermöglichen, unterbrechungsfreie Änderungen an Arbeitsabläufen vorzunehmen, ohne laufende Instanzen zu beeinträchtigen.

Saga-Muster für verteilte Transaktionen

Das Saga-Muster ist eine Möglichkeit, die Datenkonsistenz über Mikrodienste hinweg in verteilten Transaktionsszenarien zu verwalten. Dies ist besonders nützlich, wenn Sie die Konsistenz über mehrere Dienste hinweg aufrechterhalten müssen, ohne verteilte ACID-Transaktionen zu verwenden. Temporal bietet einen hervorragenden Rahmen für die Umsetzung von Sagen.

Nachdem wir uns nun mit diesen Konzepten befasst haben, beginnen wir mit der Implementierung unseres erweiterten Auftragsverarbeitungs-Workflows.

3. Implementierung komplexer Auftragsabwicklungs-Workflows

Lassen Sie uns einen mehrstufigen Auftragsabwicklungsworkflow entwerfen, der Auftragsvalidierung, Zahlungsabwicklung, Bestandsverwaltung und Versandvereinbarung umfasst. Wir werden jeden dieser Schritte als separate Aktivitäten implementieren, die durch einen Workflow koordiniert werden.

Lassen Sie uns zunächst unsere Aktivitäten definieren:

// internal/workflow/activities.go

package workflow

import (
    "context"
    "errors"

    "go.temporal.io/sdk/activity"
    "github.com/yourusername/order-processing-system/internal/db"
)

type OrderActivities struct {
    queries *db.Queries
}

func NewOrderActivities(queries *db.Queries) *OrderActivities {
    return &OrderActivities{queries: queries}
}

func (a *OrderActivities) ValidateOrder(ctx context.Context, order db.Order) error {
    // Implement order validation logic
    if order.TotalAmount <= 0 {
        return errors.New("invalid order amount")
    }
    // Add more validation as needed
    return nil
}

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    // Implement payment processing logic
    // This could involve calling a payment gateway API
    activity.GetLogger(ctx).Info("Processing payment", "orderId", order.ID, "amount", order.TotalAmount)
    // Simulate payment processing
    // In a real scenario, you'd integrate with a payment gateway here
    return nil
}

func (a *OrderActivities) UpdateInventory(ctx context.Context, order db.Order) error {
    // Implement inventory update logic
    // This could involve updating stock levels in the database
    activity.GetLogger(ctx).Info("Updating inventory", "orderId", order.ID)
    // Simulate inventory update
    // In a real scenario, you'd update your inventory management system here
    return nil
}

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    // Implement shipping arrangement logic
    // This could involve calling a shipping provider's API
    activity.GetLogger(ctx).Info("Arranging shipping", "orderId", order.ID)
    // Simulate shipping arrangement
    // In a real scenario, you'd integrate with a shipping provider here
    return nil
}

Jetzt implementieren wir unseren komplexen Auftragsabwicklungsworkflow:

// internal/workflow/order_workflow.go

package workflow

import (
    "time"

    "go.temporal.io/sdk/workflow"
    "github.com/yourusername/order-processing-system/internal/db"
)

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Activity options
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval: time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval: time.Minute,
            MaximumAttempts: 5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Step 1: Validate Order
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Order validation failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Payment processing failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Inventory update failed", "OrderID", order.ID, "Error", err)
        // In case of inventory update failure, we might need to refund the payment
        // This is where the saga pattern becomes useful, which we'll cover later
        return err
    }

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Shipping arrangement failed", "OrderID", order.ID, "Error", err)
        // If shipping fails, we might need to revert inventory and refund payment
        return err
    }

    logger.Info("OrderWorkflow completed successfully", "OrderID", order.ID)
    return nil
}

Dieser Workflow koordiniert mehrere Aktivitäten, die jeweils einen Schritt in unserer Auftragsabwicklung darstellen. Beachten Sie, wie wir Workflow.ExecuteActivity verwenden, um jede Aktivität auszuführen und die Bestelldaten nach Bedarf zu übergeben.

Wir haben außerdem Aktivitätsoptionen mit einer Wiederholungsrichtlinie eingerichtet. Das heißt, wenn eine Aktivität fehlschlägt (z. B. aufgrund eines vorübergehenden Netzwerkproblems), wird Temporal sie basierend auf unserer angegebenen Richtlinie automatisch wiederholen.

Im nächsten Abschnitt werden wir untersuchen, wie man mit lang andauernden Prozessen innerhalb dieser Workflow-Struktur umgeht.

4. Handling Long-Running Processes with Temporal

In real-world scenarios, some of our activities might take a long time to complete. For example, payment processing might need to wait for bank confirmation, or shipping arrangement might depend on external logistics systems. Temporal provides several mechanisms to handle such long-running processes effectively.

Heartbeats for Long-Running Activities

For activities that might run for extended periods, it’s crucial to implement heartbeats. Heartbeats allow an activity to report its progress and let Temporal know that it’s still alive and working. If an activity fails to heartbeat within the expected interval, Temporal can mark it as failed and potentially retry it.

Let’s modify our ArrangeShipping activity to include heartbeats:

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Arranging shipping", "orderId", order.ID)

    // Simulate a long-running process
    for i := 0; i < 10; i++ {
        // Simulate work
        time.Sleep(time.Second)

        // Record heartbeat
        activity.RecordHeartbeat(ctx, i)

        // Check if we need to cancel
        if activity.GetInfo(ctx).Attempt > 1 {
            logger.Info("Cancelling shipping arrangement due to retry", "orderId", order.ID)
            return nil
        }
    }

    logger.Info("Shipping arranged", "orderId", order.ID)
    return nil
}

In this example, we’re simulating a long-running process with a loop. We record a heartbeat in each iteration, allowing Temporal to track the activity’s progress.

Using Continue-As-New for Very Long-Running Workflows

For workflows that run for very long periods or accumulate a large history, Temporal provides the “continue-as-new” feature. This allows you to complete the current workflow execution and immediately start a new execution with the same workflow ID, carrying over any necessary state.

Here’s an example of how we might use continue-as-new in a long-running order tracking workflow:

func LongRunningOrderTrackingWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)

    // Set up a timer for how long we want this workflow execution to run
    timerFired := workflow.NewTimer(ctx, 24*time.Hour)

    // Set up a selector to wait for either the timer to fire or the order to be delivered
    selector := workflow.NewSelector(ctx)

    var orderDelivered bool
    selector.AddFuture(timerFired, func(f workflow.Future) {
        // Timer fired, we'll continue-as-new
        logger.Info("24 hours passed, continuing as new", "orderID", orderID)
        workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
    })

    selector.AddReceive(workflow.GetSignalChannel(ctx, "orderDelivered"), func(c workflow.ReceiveChannel, more bool) {
        c.Receive(ctx, &orderDelivered)
        logger.Info("Order delivered signal received", "orderID", orderID)
    })

    selector.Select(ctx)

    if orderDelivered {
        logger.Info("Order tracking completed, order delivered", "orderID", orderID)
        return nil
    }

    // If we reach here, it means we're continuing as new
    return workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
}

In this example, we set up a workflow that tracks an order for delivery. It runs for 24 hours before using continue-as-new to start a fresh execution. This prevents the workflow history from growing too large over extended periods.

By leveraging these techniques, we can handle long-running processes effectively in our order processing system, ensuring reliability and scalability even for operations that take extended periods to complete.

In the next section, we’ll dive into implementing robust retry logic and error handling in our workflows and activities.

5. Implementing Retry Logic and Error Handling

Robust error handling and retry mechanisms are crucial for building resilient systems, especially in distributed environments. Temporal provides powerful built-in retry mechanisms, but it’s important to understand how to use them effectively and when to implement custom retry logic.

Configuring Retry Policies for Activities

Temporal allows you to configure retry policies at both the workflow and activity level. Let’s update our workflow to include a more sophisticated retry policy:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define a retry policy
    retryPolicy := &temporal.RetryPolicy{
        InitialInterval: time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval: time.Minute,
        MaximumAttempts: 5,
        NonRetryableErrorTypes: []string{"InvalidOrderError"},
    }

    // Activity options with retry policy
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: retryPolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Execute activities with retry policy
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        return handleOrderError(ctx, "ValidateOrder", err, order)
    }

    // ... (other activities)

    return nil
}

In this example, we’ve defined a retry policy that starts with a 1-second interval, doubles the interval with each retry (up to a maximum of 1 minute), and allows up to 5 attempts. We’ve also specified that errors of type “InvalidOrderError” should not be retried.

Implementing Custom Retry Logic

While Temporal’s built-in retry mechanisms are powerful, sometimes you need custom retry logic. Here’s an example of implementing custom retry logic for a payment processing activity:

func (a *OrderActivities) ProcessPaymentWithCustomRetry(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    var err error
    for attempt := 1; attempt <= 3; attempt++ {
        err = a.processPayment(ctx, order)
        if err == nil {
            return nil
        }

        if _, ok := err.(*PaymentDeclinedError); ok {
            // Payment was declined, no point in retrying
            return err
        }

        logger.Info("Payment processing failed, retrying", "attempt", attempt, "error", err)
        time.Sleep(time.Duration(attempt) * time.Second)
    }
    return err
}

func (a *OrderActivities) processPayment(ctx context.Context, order db.Order) error {
    // Actual payment processing logic here
    // ...
}

In this example, we implement a custom retry mechanism that attempts the payment processing up to 3 times, with an increasing delay between attempts. It also handles a specific error type (PaymentDeclinedError) differently, not retrying in that case.

Handling and Propagating Errors

Proper error handling is crucial for maintaining the integrity of our workflow. Let’s implement a helper function to handle errors in our workflow:

func handleOrderError(ctx workflow.Context, activityName string, err error, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Activity failed", "activity", activityName, "orderID", order.ID, "error", err)

    // Depending on the activity and error type, we might want to compensate
    switch activityName {
    case "ProcessPayment":
        // If payment processing failed, we might need to cancel the order
        _ = workflow.ExecuteActivity(ctx, CancelOrder, order).Get(ctx, nil)
    case "UpdateInventory":
        // If inventory update failed after payment, we might need to refund
        _ = workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil)
    }

    // Create a customer-facing error message
    return workflow.NewCustomError("OrderProcessingFailed", "Failed to process order due to: "+err.Error())
}

This helper function logs the error, performs any necessary compensating actions, and returns a custom error that can be safely returned to the customer.

6. Versioning Workflows for Safe Updates

As your system evolves, you’ll need to update your workflow definitions. Temporal provides versioning capabilities that allow you to make changes to workflows without affecting running instances.

Implementing Versioned Workflows

Here’s an example of how to implement versioning in our order processing workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Use GetVersion to handle workflow versioning
    v := workflow.GetVersion(ctx, "OrderWorkflow.PaymentProcessing", workflow.DefaultVersion, 1)

    if v == workflow.DefaultVersion {
        // Old version: process payment before updating inventory
        err := workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }
    } else {
        // New version: update inventory before processing payment
        err := workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }
    }

    // ... rest of the workflow

    return nil
}

In this example, we’ve used workflow.GetVersion to introduce a change in the order of operations. The new version updates inventory before processing payment, while the old version does the opposite. This allows us to gradually roll out the change without affecting running workflow instances.

Strategies for Updating Workflows in Production

When updating workflows in a production environment, consider the following strategies:

  1. Incremental Changes : Make small, incremental changes rather than large overhauls. This makes it easier to manage versions and roll back if needed.

  2. Compatibility Periods : Maintain compatibility with older versions for a certain period to allow running workflows to complete.

  3. Feature Flags : Use feature flags in conjunction with workflow versions to control the rollout of new features.

  4. Monitoring and Alerting : Set up monitoring and alerting for workflow versions to track the progress of updates and quickly identify any issues.

  5. Rollback Plan : Always have a plan to roll back to the previous version if issues are detected with the new version.

By following these strategies and leveraging Temporal’s versioning capabilities, you can safely evolve your workflows over time without disrupting ongoing operations.

In the next section, we’ll explore how to implement the Saga pattern for managing distributed transactions in our order processing system.

7. Implementing Saga Patterns for Distributed Transactions

The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful in our order processing system where we need to coordinate actions across multiple services (e.g., inventory, payment, shipping) and provide a mechanism for compensating actions if any step fails.

Designing a Saga for Our Order Processing System

Let’s design a saga for our order processing system that includes the following steps:

  1. Reserve Inventory
  2. Process Payment
  3. Update Inventory
  4. Arrange Shipping

If any of these steps fail, we need to execute compensating actions for the steps that have already completed.

Here’s how we can implement this saga using Temporal:

func OrderSaga(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderSaga started", "OrderID", order.ID)

    // Saga compensations
    var compensations []func(context.Context) error

    // Step 1: Reserve Inventory
    err := workflow.ExecuteActivity(ctx, a.ReserveInventory, order).Get(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to reserve inventory: %w", err)
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.ReleaseInventoryReservation(ctx, order)
    })

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to process payment: %w", err))
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.RefundPayment(ctx, order)
    })

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to update inventory: %w", err))
    }
    // No compensation needed for this step, as we've already updated the inventory

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to arrange shipping: %w", err))
    }

    logger.Info("OrderSaga completed successfully", "OrderID", order.ID)
    return nil
}

func compensate(ctx workflow.Context, compensations []func(context.Context) error, err error) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Saga failed, executing compensations", "error", err)

    for i := len(compensations) - 1; i >= 0; i-- {
        compensationErr := workflow.ExecuteActivity(ctx, compensations[i]).Get(ctx, nil)
        if compensationErr != nil {
            logger.Error("Compensation failed", "error", compensationErr)
            // In a real-world scenario, you might want to implement more sophisticated
            // error handling for failed compensations, such as retrying or alerting
        }
    }

    return err
}

In this implementation, we execute each step of the order process as an activity. After each successful step, we add a compensating action to a slice. If any step fails, we call the compensate function, which executes all the compensating actions in reverse order.

This approach ensures that we maintain data consistency across our distributed system, even in the face of failures.

8. Monitoring and Observability for Temporal Workflows

Effective monitoring and observability are crucial for operating Temporal workflows in production. Let’s explore how to implement comprehensive monitoring for our order processing system.

Implementing Custom Metrics

Temporal provides built-in metrics, but we can also implement custom metrics for our specific use cases. Here’s an example of how to add custom metrics to our workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define metric
    orderProcessingTime := workflow.NewTimer(ctx, 0)
    defer func() {
        duration := orderProcessingTime.ElapsedTime()
        workflow.GetMetricsHandler(ctx).Timer("order_processing_time").Record(duration)
    }()

    // ... rest of the workflow implementation

    return nil
}

In this example, we’re recording the total time taken to process an order.

Integrating with Prometheus

To integrate with Prometheus, we need to expose our metrics. Here’s how we can set up a Prometheus endpoint in our main application:

package main

import (
    "net/http"

    "github.com/prometheus/client_golang/prometheus/promhttp"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    // ... Temporal client setup

    // Create a worker
    w := worker.New(c, "order-processing-task-queue", worker.Options{})

    // Register workflows and activities
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterActivity(a.ValidateOrder)
    // ... register other activities

    // Start the worker
    go func() {
        err := w.Run(worker.InterruptCh())
        if err != nil {
            logger.Fatal("Unable to start worker", err)
        }
    }()

    // Expose Prometheus metrics
    http.Handle("/metrics", promhttp.Handler())
    go func() {
        err := http.ListenAndServe(":2112", nil)
        if err != nil {
            logger.Fatal("Unable to start metrics server", err)
        }
    }()

    // ... rest of your application
}

This sets up a /metrics endpoint that Prometheus can scrape to collect our custom metrics along with the built-in Temporal metrics.

Implementing Structured Logging

Structured logging can greatly improve the observability of our system. Let’s update our workflow to use structured logging:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started",
        "OrderID", order.ID,
        "CustomerID", order.CustomerID,
        "TotalAmount", order.TotalAmount,
    )

    // ... workflow implementation

    logger.Info("OrderWorkflow completed",
        "OrderID", order.ID,
        "Duration", workflow.Now(ctx).Sub(workflow.GetInfo(ctx).WorkflowStartTime),
    )

    return nil
}

This approach makes it easier to search and analyze logs, especially when aggregating logs from multiple services.

Setting Up Distributed Tracing

Distributed tracing can provide valuable insights into the flow of requests through our system. While Temporal doesn’t natively support distributed tracing, we can implement it in our activities:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    _, span := otel.Tracer("order-processing").Start(ctx, "ProcessPayment")
    defer span.End()

    span.SetAttributes(
        attribute.Int64("order.id", order.ID),
        attribute.Float64("order.amount", order.TotalAmount),
    )

    // ... payment processing logic

    return nil
}

By implementing distributed tracing, we can track the entire lifecycle of an order across multiple services and activities.

9. Testing and Validation

Thorough testing is crucial for ensuring the reliability of our Temporal workflows. Let’s explore some strategies for testing our order processing system.

Unit Testing Workflows

Temporal provides a testing framework that allows us to unit test workflows. Here’s an example of how to test our OrderWorkflow:

func TestOrderWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ValidateOrder, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.UpdateInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ArrangeShipping, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderWorkflow, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}

This test sets up a test environment, mocks the activities, and verifies that the workflow completes successfully.

Testing Saga Compensations

It’s important to test that our saga compensations work correctly. Here’s an example test:

func TestOrderSagaCompensation(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(errors.New("payment failed"))
    env.OnActivity(a.ReleaseInventoryReservation, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderSaga, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.Error(t, env.GetWorkflowError())

    // Verify that compensation was called
    env.AssertExpectations(t)
}

This test verifies that when the payment processing fails, the inventory reservation is released as part of the compensation.

10. 과제와 고려사항

고급 주문 처리 시스템을 구현하고 운영할 때 염두에 두어야 할 몇 가지 과제와 고려 사항이 있습니다.

  1. 워크플로 복잡성 : 워크플로가 더욱 복잡해지면 이해하고 유지 관리하기가 어려워질 수 있습니다. 정기적인 리팩토링과 좋은 문서화가 중요합니다.

  2. 장기 실행 워크플로 테스트 : 며칠 또는 몇 주 동안 실행될 수 있는 워크플로를 테스트하는 것은 어려울 수 있습니다. 테스트 시간을 단축할 수 있는 메커니즘 구현을 고려해 보세요.

  3. 외부 종속성 처리 : 외부 서비스가 실패하거나 사용 불가능해질 수 있습니다. 이러한 시나리오를 처리하려면 회로 차단기와 대체 메커니즘을 구현하세요.

  4. 모니터링 및 알림 : 워크플로의 문제를 신속하게 식별하고 대응할 수 있도록 포괄적인 모니터링 및 알림을 설정하세요.

  5. 데이터 일관성 : 오류가 발생하더라도 사가 구현이 서비스 전체에서 데이터 일관성을 유지하는지 확인하세요.

  6. 성능 조정 : 시스템이 확장됨에 따라 워크플로 및 활동 작업자 수와 같은 Temporal의 성능 설정을 조정해야 할 수도 있습니다.

  7. 워크플로 버전 관리 : 실행 중인 인스턴스를 중단하지 않고 원활하게 업데이트할 수 있도록 워크플로 버전을 신중하게 관리하세요.

11. 다음 단계 및 3부 미리보기

이 게시물에서는 고급 임시 워크플로 개념, 복잡한 주문 처리 논리, 사가 패턴 및 강력한 오류 처리 구현을 자세히 살펴보았습니다. 또한 워크플로에 대한 모니터링, 관찰 가능성 및 테스트 전략도 다루었습니다.

시리즈의 다음 부분에서는 sqlc를 사용한 고급 데이터베이스 작업에 중점을 둘 것입니다. 우리가 다룰 내용은 다음과 같습니다.

  1. 복잡한 데이터베이스 쿼리 및 트랜잭션 구현
  2. 데이터베이스 성능 최적화
  3. 일괄 작업 구현
  4. 프로덕션 환경에서 데이터베이스 마이그레이션 처리
  5. 확장성을 위한 데이터베이스 샤딩 구현
  6. 분산 시스템의 데이터 일관성 보장

정교한 주문 처리 시스템을 지속적으로 구축해 나가고 있으니 지켜봐 주시기 바랍니다!


도움이 필요하신가요?

어려운 문제에 직면했거나 새로운 아이디어나 프로젝트에 대한 외부 관점이 필요합니까? 제가 도와드릴 수 있어요! 대규모 투자를 하기 전에 기술 개념 증명을 구축하려는 경우나 어려운 문제에 대한 지침이 필요한 경우 제가 도와드리겠습니다.

제공되는 서비스:

  • 문제 해결: 혁신적인 솔루션으로 복잡한 문제를 해결합니다.
  • 상담: 프로젝트에 대한 전문가의 조언과 신선한 관점을 제공합니다.
  • 개념 증명: 아이디어를 테스트하고 검증하기 위한 예비 모델 개발

저와 함께 일하는 데 관심이 있으시면 hangaikevin@gmail.com으로 이메일을 보내주세요.

당신의 도전을 기회로 바꾸세요!

위 내용은 주문 처리 시스템 구현: 고급 임시 워크플로우 부분의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.