首頁 >後端開發 >Golang >ES批次請求`es_rejected_execution_exception`

ES批次請求`es_rejected_execution_exception`

WBOY
WBOY轉載
2024-02-11 21:50:071009瀏覽

ES批次請求`es_rejected_execution_exception`

php小編蘋果為您介紹ES批量請求中的常見問題:`es_rejected_execution_exception`。在使用Elasticsearch進行批次請求時,有時會遇到這個異常。這個異常通常表示請求的同時數量超過了Elasticsearch伺服器的處理能力,導致請求被拒絕執行。本文將為您解析這個異常的原因,並給予解決方案,幫助您順利處理該問題。

問題內容

我有一個大約5M 條目的切片(為簡單起見,假設每個條目都是一個位元組切片,它使用getIndexerItem 函數映射到索引器項目),我將其平均分配給200 個go 例程。然後每個go例程呼叫push函數,切片長度為5M/200。

根據我對Refresh的理解:wait_for,每當向elastic發出請求時,只有當該請求所做的更改對搜尋可見時才會完成(IMO將其轉換為不再有此特定請求的批次請求佇列) )。那為什麼我會收到此錯誤?

error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: 
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
    name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
    [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]

所有條目都將進入相同的索引,ankit-test

func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
    return esutil.BulkIndexerItem{
        Index:        index,
        DocumentID:   id,
        Body:         bytes.NewReader(body),
        Action:       "index",
        DocumentType: "logs",
        OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
            if err != nil {
                fmt.Printf("error indexing item: %s\n", err.Error())
            } else {
                fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason)
            }
        },
    }
}

func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
    indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Client: e.client,
        Refresh: "wait_for",
        NumWorkers: 1,
        OnError: func(ctx context.Context, err error) {
            fmt.Printf("received onError %s\n", err.Error())
        },
    })
    if err != nil {
        return nil, fmt.Errorf("error creating bulk indexer: %s", err)
    }

    ctx := context.Background()
    for _, d := range data {
        if err := indexer.Add(ctx, d); err != nil {
            fmt.Printf("error adding data to indexer: %s\n", err)
        }
    }
    if err := indexer.Close(ctx); err != nil {
        fmt.Printf("error flushing and closing indexer: %s\n", err)
    }

    indexerStats := indexer.Stats()
    return &indexerStats, nil
}

假設沒有其他行程以任何方式與索引互動。

解決方法

使用多個 ES 文檔,我能夠找到上述問題的解決方案。下面的回答是基於我的理解。如果您發現可以改進/修正的內容,請發表評論。

這是請求生命週期:

  1. golang es客戶端將多個請求合併為一個,並以單一批次請求的形式傳送到伺服器。單一批次請求可以包含發送至多個索引和分片的文件。
  2. 當批次請求到達叢集中的節點(也稱為協調節點)時,它會被整體放入批次佇列中,並由批次執行緒池中的執行緒進行處理。
  3. 協調節點根據文件需要路由到的分片來拆分批次請求。 每個批次子請求都會轉送到保存對應主分片的資料節點。批次子請求在該節點的批次佇列中排隊。如果佇列上沒有可用空間,則通知協調節點批次子請求已被拒絕。
  4. 所有子請求完成或被拒絕後,就會建立回應並將其傳回給客戶端。有可能甚至很可能批量請求中只有部分文件被拒絕。

我的問題是我使用 refresh = false (預設)發送請求。相反,應該使用 refresh = wait_for 。為什麼?刷新提供了3種模式:

  1. false:不執行與刷新相關的操作。此請求所做的更改將在請求返回後的某個時刻可見。在收到回應時,請求不必已完成。請求可能仍在節點的佇列中。
  2. true:操作發生後立即刷新相關主分片和副本分片。確保在發迴回應之前請求已完成。請求已從節點佇列中刪除。
  3. wait_for:等待請求所做的更改在回復之前透過刷新可見。與 true 不同的是,這不會強制立即刷新,而是等待刷新發生。比 refresh = true 便宜(就伺服器負載而言),但仍確保在發送迴響應之前請求已完成。請求已從節點佇列中刪除。

所有資料都被重定向到同一節點,並且由於 refresh = false,在現有請求從佇列中清除之前返回了回應,這導致了溢出。

以上是ES批次請求`es_rejected_execution_exception`的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除