首頁  >  文章  >  後端開發  >  Kubernetes Operator 如何處理並發?

Kubernetes Operator 如何處理並發?

Barbara Streisand
Barbara Streisand原創
2024-10-10 06:07:29283瀏覽

最初發佈在我的部落格

預設情況下,使用 Kubebuilder 和控制器運行時建立的運算子一次處理一個協調請求。這是一個明智的設置,因為操作員開發人員可以更輕鬆地推理和調試應用程式中的邏輯。它還限制了從控制器到 ectd 和 API 伺服器等核心 Kubernetes 資源的吞吐量。

但是,如果您的工作佇列開始備份,並且由於請求留在佇列中等待處理而導致平均協調時間增加,該怎麼辦?對我們來說幸運的是,控制器運行時 Controller 結構包含一個 MaxConcurrentReconciles 字段(正如我之前在 Kubebuilder Tips 文章中提到的)。此選項可讓您設定在單一控制器中運行的並發協調循環的數量。因此,如果值大於 1,您可以同時協調多個 Kubernetes 資源。

在我的 Operator 旅程的早期,我遇到的一個問題是我們如何保證同一資源不會同時被 2 個或更多 Goroutine 協調?將MaxConcurrentReconciles 設為高於1 時,這可能會導致各種競爭條件和不良行為,因為協調循環內的物件狀態可能會因外部來源(在不同執行緒中運行的協調循環)的副作用而改變.

我對此思考了一段時間,甚至實作了一種基於sync.Map 的方法,該方法允許 goroutine 取得給定資源的鎖(基於其命名空間/名稱)。

事實證明,所有這些努力都是徒勞的,因為我最近(在 k8s slack 通道中)了解到控制器工作佇列已經包含此功能!儘管實作更簡單。

這是一個關於 k8s 控制器的工作佇列如何保證唯一資源按順序協調的簡短故事。因此,即使 MaxConcurrentReconciles 設定為高於 1,您也可以確信一次只有一個協調函數作用於任何給定資源。

客戶端/util

Controller-runtime 使用 client-go/util/workqueue 函式庫來實現其底層協調佇列。在套件的 doc.go 檔案中,註解指出工作佇列支援以下屬性:

  • 公平:按照新增順序處理項目。
  • 吝嗇:單一item不會被同時處理多次,如果一個item在被處理之前被多次添加,那麼它只會被處理一次。
  • 多位消費者和生產者。特別是,允許在處理項目時重新排隊。
  • 關閉通知。

等一下...我的答案就在第二個項目符號中,「吝嗇」屬性!根據這些文檔,隊列將自動為我處理這個並發問題,而無需編寫一行程式碼。讓我們來看看具體的實作。

工作隊列如何運作?

workqueue 結構體有 3 種主要方法,Add、Get 和 Done。在控制器內部,通知者會將協調請求(通用 k8s 資源的命名空間名稱)加入到工作佇列中。在單獨的 goroutine 中執行的協調循環將從佇列中取得下一個請求(如果佇列為空則阻塞)。此循環將執行控制器中編寫的任何自訂邏輯,然後控制器將呼叫佇列上的 Done,並將協調請求作為參數傳遞。這將再次開始該過程,並且協調循環將呼叫 Get 來檢索下一個工作項目。

這類似於在RabbitMQ 中處理訊息,工作人員從佇列中彈出一個項目,對其進行處理,然後將「Ack」發送回訊息代理,表明處理已完成,並且可以安全地從佇列中刪除該項目隊列。

不過,我有一個在生產環境中運行的操作員為 QuestDB Cloud 的基礎設施提供支持,並且希望確保工作隊列按照廣告中的方式工作。因此,a 編寫了一個快速測試來驗證其行為。

一點測試

這是一個驗證「Stingy」屬性的簡單測試:

package main_test

import (
    "testing"

    "github.com/stretchr/testify/assert"

    "k8s.io/client-go/util/workqueue"
)

func TestWorkqueueStingyProperty(t *testing.T) {

    type Request int

    // Create a new workqueue and add a request
    wq := workqueue.New()
    wq.Add(Request(1))
    assert.Equal(t, wq.Len(), 1)

    // Subsequent adds of an identical object
    // should still result in a single queued one
    wq.Add(Request(1))
    wq.Add(Request(1))
    assert.Equal(t, wq.Len(), 1)

    // Getting the object should remove it from the queue
    // At this point, the controller is processing the request
    obj, _ := wq.Get()
    req := obj.(Request)
    assert.Equal(t, wq.Len(), 0)

    // But re-adding an identical request before it is marked as "Done"
    // should be a no-op, since we don't want to process it simultaneously
    // with the first one
    wq.Add(Request(1))
    assert.Equal(t, wq.Len(), 0)

    // Once the original request is marked as Done, the second
    // instance of the object will be now available for processing
    wq.Done(req)
    assert.Equal(t, wq.Len(), 1)

    // And since it is available for processing, it will be
    // returned by a Get call
    wq.Get()
    assert.Equal(t, wq.Len(), 0)
}

由於工作佇列在底層使用互斥體,因此這種行為是執行緒安全的。因此,即使我編寫了更多使用多個 goroutine 同時高速讀寫隊列的測試來試圖破壞它,工作隊列的實際行為也將與我們的單線程測試相同。

一切並沒有失去

How do Kubernetes Operators Handle Concurrency?

Kubernetes 標準庫中隱藏著許多像這樣的小寶石,其中一些位於不太明顯的地方(例如在 go 客戶端套件中找到的控制器運行時工作佇列)。儘管有這個發現,以及我過去所做的其他類似的發現,我仍然覺得我之前解決這些問題的嘗試並不是完全浪費時間。它們迫使您批判性地思考分散式系統運算中的基本問題,並幫助您更多地了解幕後發生的事情。因此,當我發現「Kubernetes 做到了」時,我鬆了一口氣,因為我可以簡化我的程式碼庫,或許還可以刪除一些不必要的單元測試。

以上是Kubernetes Operator 如何處理並發?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn