search
HomeBackend DevelopmentGolangBuilding a Scalable SQS Consumer in Go

Building a Scalable SQS Consumer in Go

Introduction

When building distributed systems, message queues like Amazon SQS play a crucial role in handling asynchronous workloads. In this post, I'll share my experience implementing a robust SQS consumer in Go that handles user registration events for Keycloak. The solution uses the fan-out/fan-in concurrency pattern to process messages efficiently without overwhelming system resources.

The Challenge

I faced an interesting problem: process around 50,000 SQS events daily to register users in Keycloak. A naive approach might spawn a new goroutine for each message, but this could quickly lead to resource exhaustion. We needed a more controlled approach to concurrency.

Why Fan-out/Fan-in?

The fan-out/fan-in pattern is perfect for this use case because it:

  • Maintains a fixed pool of worker goroutines
  • Distributes work evenly across workers
  • Prevents resource exhaustion
  • Provides better control over concurrent operations

Implementation Deep Dive

1. The Consumer Structure

First, let's look at our basic consumer structure:

type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

2. Message Processing Pipeline

The implementation consists of three main components:

  1. Message Receiver: Continuously polls SQS for new messages
  2. Worker Pool: Fixed number of goroutines processing messages
  3. Message Channel: Connects the receiver to workers

Here's how we start the consumer:

func StartPool[requestBody any](
    serviceFunc func(c context.Context, dto *requestBody) error,
    consumer *Consumer) {

    ctx := context.Background()
    params := &sqs.ReceiveMessageInput{
        MaxNumberOfMessages: 10,
        QueueUrl:           aws.String(consumer.QueueName),
        WaitTimeSeconds:    20,
        VisibilityTimeout:  30,
        MessageAttributeNames: []string{
            string(types.QueueAttributeNameAll),
        },
    }

    msgCh := make(chan types.Message)
    var wg sync.WaitGroup

    // Start worker pool first
    startPool(ctx, msgCh, &wg, consumer, serviceFunc)

    // Then start receiving messages
    // ... rest of the implementation
}

3. Key Configuration Parameters

Let's examine the crucial SQS configuration parameters:

  • MaxNumberOfMessages (10): Batch size for each poll
  • WaitTimeSeconds (20): Long polling duration
  • VisibilityTimeout (30): Grace period for message processing

4. Worker Pool Implementation

The worker pool is where the fan-out pattern comes into play:

func startPool[requestBody any](
    ctx context.Context,
    msgCh chan types.Message,
    wg *sync.WaitGroup,
    consumer *Consumer,
    serviceFunc func(c context.Context, dto *requestBody) error) {

    processingMessages := &sync.Map{}

    // Start 10 workers
    for i := 0; i 



<h3>
  
  
  5. Duplicate Message Handling
</h3>

<p>We use a sync.Map to prevent processing duplicate messages:<br>
</p><pre class="brush:php;toolbar:false">type Consumer struct {
    Client    *sqs.Client
    QueueName string
}

Best Practices and Learnings

  1. Error Handling: Always handle errors gracefully and log them appropriately
  2. Message Cleanup: Delete messages only after successful processing
  3. Graceful Shutdown: Implement proper shutdown mechanisms using context
  4. Monitoring: Add logging at key points for observability

Performance Considerations

  • Worker Count: Choose based on your workload and available resources
  • Batch Size: Balance between throughput and processing time
  • Visibility Timeout: Set according to your average processing time

Future Improvements

  1. Dynamic Worker Scaling: Adjust worker count based on queue depth
  2. Circuit Breaker: Add circuit breaker for downstream services
  3. Metrics Collection: Add Prometheus metrics for monitoring
  4. Dead Letter Queue: Implement DLQ handling for failed messages
  5. Retries: Add exponential backoff for transient failures

Conclusion

The fan-out/fan-in pattern provides an elegant solution for processing high-volume SQS messages in Go. By maintaining a fixed worker pool, we avoid the pitfalls of unbounded goroutine creation while ensuring efficient message processing.

Remember to always consider your specific use case when implementing such patterns. The configuration values shown here (worker count, timeout values, etc.) should be adjusted based on your requirements and resource constraints.


Source code: [Link to your repository if available]

Tags: #golang #aws #sqs #concurrency #distributed-systems

The above is the detailed content of Building a Scalable SQS Consumer in Go. For more information, please follow other related articles on the PHP Chinese website!

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Golang: The Go Programming Language ExplainedGolang: The Go Programming Language ExplainedApr 10, 2025 am 11:18 AM

The core features of Go include garbage collection, static linking and concurrency support. 1. The concurrency model of Go language realizes efficient concurrent programming through goroutine and channel. 2. Interfaces and polymorphisms are implemented through interface methods, so that different types can be processed in a unified manner. 3. The basic usage demonstrates the efficiency of function definition and call. 4. In advanced usage, slices provide powerful functions of dynamic resizing. 5. Common errors such as race conditions can be detected and resolved through getest-race. 6. Performance optimization Reuse objects through sync.Pool to reduce garbage collection pressure.

Golang's Purpose: Building Efficient and Scalable SystemsGolang's Purpose: Building Efficient and Scalable SystemsApr 09, 2025 pm 05:17 PM

Go language performs well in building efficient and scalable systems. Its advantages include: 1. High performance: compiled into machine code, fast running speed; 2. Concurrent programming: simplify multitasking through goroutines and channels; 3. Simplicity: concise syntax, reducing learning and maintenance costs; 4. Cross-platform: supports cross-platform compilation, easy deployment.

Why do the results of ORDER BY statements in SQL sorting sometimes seem random?Why do the results of ORDER BY statements in SQL sorting sometimes seem random?Apr 02, 2025 pm 05:24 PM

Confused about the sorting of SQL query results. In the process of learning SQL, you often encounter some confusing problems. Recently, the author is reading "MICK-SQL Basics"...

Is technology stack convergence just a process of technology stack selection?Is technology stack convergence just a process of technology stack selection?Apr 02, 2025 pm 05:21 PM

The relationship between technology stack convergence and technology selection In software development, the selection and management of technology stacks are a very critical issue. Recently, some readers have proposed...

How to use reflection comparison and handle the differences between three structures in Go?How to use reflection comparison and handle the differences between three structures in Go?Apr 02, 2025 pm 05:15 PM

How to compare and handle three structures in Go language. In Go programming, it is sometimes necessary to compare the differences between two structures and apply these differences to the...

How to view globally installed packages in Go?How to view globally installed packages in Go?Apr 02, 2025 pm 05:12 PM

How to view globally installed packages in Go? In the process of developing with Go language, go often uses...

What should I do if the custom structure labels in GoLand are not displayed?What should I do if the custom structure labels in GoLand are not displayed?Apr 02, 2025 pm 05:09 PM

What should I do if the custom structure labels in GoLand are not displayed? When using GoLand for Go language development, many developers will encounter custom structure tags...

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
3 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
3 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
3 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: How To Unlock Everything In MyRise
3 weeks agoBy尊渡假赌尊渡假赌尊渡假赌

Hot Tools

EditPlus Chinese cracked version

EditPlus Chinese cracked version

Small size, syntax highlighting, does not support code prompt function

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

WebStorm Mac version

WebStorm Mac version

Useful JavaScript development tools

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Atom editor mac version download

Atom editor mac version download

The most popular open source editor