最初發佈在我的部落格
預設情況下,使用 Kubebuilder 和控制器運行時建立的運算子一次處理一個協調請求。這是一個明智的設置,因為操作員開發人員可以更輕鬆地推理和調試應用程式中的邏輯。它還限制了從控制器到 ectd 和 API 伺服器等核心 Kubernetes 資源的吞吐量。
但是,如果您的工作佇列開始備份,並且由於請求留在佇列中等待處理而導致平均協調時間增加,該怎麼辦?對我們來說幸運的是,控制器運行時 Controller 結構包含一個 MaxConcurrentReconciles 字段(正如我之前在 Kubebuilder Tips 文章中提到的)。此選項可讓您設定在單一控制器中運行的並發協調循環的數量。因此,如果值大於 1,您可以同時協調多個 Kubernetes 資源。
在我的 Operator 旅程的早期,我遇到的一個問題是我們如何保證同一資源不會同時被 2 個或更多 Goroutine 協調?將MaxConcurrentReconciles 設為高於1 時,這可能會導致各種競爭條件和不良行為,因為協調循環內的物件狀態可能會因外部來源(在不同執行緒中運行的協調循環)的副作用而改變.
我對此思考了一段時間,甚至實作了一種基於sync.Map 的方法,該方法允許 goroutine 取得給定資源的鎖(基於其命名空間/名稱)。
事實證明,所有這些努力都是徒勞的,因為我最近(在 k8s slack 通道中)了解到控制器工作佇列已經包含此功能!儘管實作更簡單。
這是一個關於 k8s 控制器的工作佇列如何保證唯一資源按順序協調的簡短故事。因此,即使 MaxConcurrentReconciles 設定為高於 1,您也可以確信一次只有一個協調函數作用於任何給定資源。
Controller-runtime 使用 client-go/util/workqueue 函式庫來實現其底層協調佇列。在套件的 doc.go 檔案中,註解指出工作佇列支援以下屬性:
等一下...我的答案就在第二個項目符號中,「吝嗇」屬性!根據這些文檔,隊列將自動為我處理這個並發問題,而無需編寫一行程式碼。讓我們來看看具體的實作。
workqueue 結構體有 3 種主要方法,Add、Get 和 Done。在控制器內部,通知者會將協調請求(通用 k8s 資源的命名空間名稱)加入到工作佇列中。在單獨的 goroutine 中執行的協調循環將從佇列中取得下一個請求(如果佇列為空則阻塞)。此循環將執行控制器中編寫的任何自訂邏輯,然後控制器將呼叫佇列上的 Done,並將協調請求作為參數傳遞。這將再次開始該過程,並且協調循環將呼叫 Get 來檢索下一個工作項目。
這類似於在RabbitMQ 中處理訊息,工作人員從佇列中彈出一個項目,對其進行處理,然後將「Ack」發送回訊息代理,表明處理已完成,並且可以安全地從佇列中刪除該項目隊列。
不過,我有一個在生產環境中運行的操作員為 QuestDB Cloud 的基礎設施提供支持,並且希望確保工作隊列按照廣告中的方式工作。因此,a 編寫了一個快速測試來驗證其行為。
這是一個驗證「Stingy」屬性的簡單測試:
package main_test import ( "testing" "github.com/stretchr/testify/assert" "k8s.io/client-go/util/workqueue" ) func TestWorkqueueStingyProperty(t *testing.T) { type Request int // Create a new workqueue and add a request wq := workqueue.New() wq.Add(Request(1)) assert.Equal(t, wq.Len(), 1) // Subsequent adds of an identical object // should still result in a single queued one wq.Add(Request(1)) wq.Add(Request(1)) assert.Equal(t, wq.Len(), 1) // Getting the object should remove it from the queue // At this point, the controller is processing the request obj, _ := wq.Get() req := obj.(Request) assert.Equal(t, wq.Len(), 0) // But re-adding an identical request before it is marked as "Done" // should be a no-op, since we don't want to process it simultaneously // with the first one wq.Add(Request(1)) assert.Equal(t, wq.Len(), 0) // Once the original request is marked as Done, the second // instance of the object will be now available for processing wq.Done(req) assert.Equal(t, wq.Len(), 1) // And since it is available for processing, it will be // returned by a Get call wq.Get() assert.Equal(t, wq.Len(), 0) }
由於工作佇列在底層使用互斥體,因此這種行為是執行緒安全的。因此,即使我編寫了更多使用多個 goroutine 同時高速讀寫隊列的測試來試圖破壞它,工作隊列的實際行為也將與我們的單線程測試相同。
Kubernetes 標準庫中隱藏著許多像這樣的小寶石,其中一些位於不太明顯的地方(例如在 go 客戶端套件中找到的控制器運行時工作佇列)。儘管有這個發現,以及我過去所做的其他類似的發現,我仍然覺得我之前解決這些問題的嘗試並不是完全浪費時間。它們迫使您批判性地思考分散式系統運算中的基本問題,並幫助您更多地了解幕後發生的事情。因此,當我發現「Kubernetes 做到了」時,我鬆了一口氣,因為我可以簡化我的程式碼庫,或許還可以刪除一些不必要的單元測試。
以上是Kubernetes Operator 如何處理並發?的詳細內容。更多資訊請關注PHP中文網其他相關文章!