介紹
在建構分散式系統時,像 Amazon SQS 這樣的訊息佇列在處理非同步工作負載方面發揮著至關重要的作用。在這篇文章中,我將分享我在 Go 中實現強大的 SQS 消費者的經驗,該消費者可以處理 Keycloak 的用戶註冊事件。此解決方案使用扇出/扇入並發模式來有效處理訊息,而不會佔用系統資源。
挑戰
我遇到了一個有趣的問題:每天處理大約 50,000 個 SQS 事件以在 Keycloak 中註冊用戶。一種幼稚的方法可能會為每個訊息產生一個新的 goroutine,但這可能很快就會導致資源耗盡。我們需要一種更受控制的並發方法。
為什麼要扇出/扇入?
扇出/扇入模式非常適合此用例,因為它:
- 維護固定的工作協程池
- 在工人之間均勻分配工作
- 防止資源耗盡
- 提供對並發操作的更好控制
實施深入探討
1. 消費者結構
首先我們來看看我們的基本消費結構:
type Consumer struct { Client *sqs.Client QueueName string }
2. 訊息處理管道
實現由三個主要組件組成:
- 訊息接收者:不斷輪詢SQS以取得新訊息
- 工作池:處理訊息的 goroutine 數量固定
- 訊息通道:將接收者連接到工作人員
這是我們啟動消費者的方式:
func StartPool[requestBody any]( serviceFunc func(c context.Context, dto *requestBody) error, consumer *Consumer) { ctx := context.Background() params := &sqs.ReceiveMessageInput{ MaxNumberOfMessages: 10, QueueUrl: aws.String(consumer.QueueName), WaitTimeSeconds: 20, VisibilityTimeout: 30, MessageAttributeNames: []string{ string(types.QueueAttributeNameAll), }, } msgCh := make(chan types.Message) var wg sync.WaitGroup // Start worker pool first startPool(ctx, msgCh, &wg, consumer, serviceFunc) // Then start receiving messages // ... rest of the implementation }
3. 關鍵配置參數
讓我們檢查一下關鍵的 SQS 設定參數:
- MaxNumberOfMessages (10):每次輪詢的批次大小
- WaitTimeSeconds (20):長輪詢持續時間
- VisibilityTimeout (30):訊息處理的寬限期
4. 工作池實施
工作池是扇出模式發揮作用的地方:
func startPool[requestBody any]( ctx context.Context, msgCh chan types.Message, wg *sync.WaitGroup, consumer *Consumer, serviceFunc func(c context.Context, dto *requestBody) error) { processingMessages := &sync.Map{} // Start 10 workers for i := 0; i <h3> 5. 重複訊息處理 </h3> <p>我們使用sync.Map來防止處理重複訊息:<br> </p><pre class="brush:php;toolbar:false">type Consumer struct { Client *sqs.Client QueueName string }
最佳實踐和學習
- 錯誤處理:始終優雅地處理錯誤並適當記錄它們
- 訊息清理:僅在成功處理後刪除訊息
- 優雅關機:使用上下文實現正確的關閉機制
- 監控:在關鍵點新增日誌記錄以提高可觀察性
性能考慮因素
- 工作人員數量:根據您的工作負載和可用資源進行選擇
- 批次大小:吞吐量和處理時間之間的平衡
- 可見性超時:依照您的平均處理時間設定
未來的改進
- 動態工作人員擴充:依佇列深度調整工作人員數量
- 斷路器:為下游服務增加斷路器
- Metrics Collection:新增 Prometheus 指標進行監控
- 死信佇列:對失敗訊息實施DLQ處理
- 重試:為瞬時失敗增加指數退避
結論
扇出/扇入模式為在 Go 中處理大量 SQS 訊息提供了一個優雅的解決方案。透過維護固定的工作池,我們可以避免無限制的 goroutine 創建的陷阱,同時確保高效的訊息處理。
請記住在實現此類模式時始終考慮您的特定用例。此處顯示的配置值(工作執行緒數、逾時值等)應根據您的要求和資源限制進行調整。
原始碼:[連結到您的儲存庫(如果有)]
標籤:#golang #aws #sqs #concurrency #distributed-systems
以上是用 Go 建構可擴展的 SQS 消費者的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本文演示了創建模擬和存根進行單元測試。 它強調使用接口,提供模擬實現的示例,並討論最佳實踐,例如保持模擬集中並使用斷言庫。 文章

本文探討了GO的仿製藥自定義類型約束。 它詳細介紹了界面如何定義通用功能的最低類型要求,從而改善了類型的安全性和代碼可重複使用性。 本文還討論了局限性和最佳實踐

本文使用跟踪工具探討了GO應用程序執行流。 它討論了手冊和自動儀器技術,比較諸如Jaeger,Zipkin和Opentelemetry之類的工具,並突出顯示有效的數據可視化

本文討論了GO的反思軟件包,用於運行時操作代碼,對序列化,通用編程等有益。它警告性能成本,例如較慢的執行和更高的內存使用,建議明智的使用和最佳

本文討論了通過go.mod,涵蓋規範,更新和衝突解決方案管理GO模塊依賴關係。它強調了最佳實踐,例如語義版本控制和定期更新。

本文討論了GO中使用表驅動的測試,該方法使用測試用例表來測試具有多個輸入和結果的功能。它突出了諸如提高的可讀性,降低重複,可伸縮性,一致性和A


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

SAP NetWeaver Server Adapter for Eclipse
將Eclipse與SAP NetWeaver應用伺服器整合。

Atom編輯器mac版下載
最受歡迎的的開源編輯器

mPDF
mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

SecLists
SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。