php小編蘋果為您介紹ES批量請求中的常見問題:`es_rejected_execution_exception`。在使用Elasticsearch進行批次請求時,有時會遇到這個異常。這個異常通常表示請求的同時數量超過了Elasticsearch伺服器的處理能力,導致請求被拒絕執行。本文將為您解析這個異常的原因,並給予解決方案,幫助您順利處理該問題。
我有一個大約5M 條目的切片(為簡單起見,假設每個條目都是一個位元組切片,它使用getIndexerItem
函數映射到索引器項目),我將其平均分配給200 個go 例程。然後每個go例程呼叫push
函數,切片長度為5M/200。
根據我對Refresh的理解:wait_for
,每當向elastic發出請求時,只有當該請求所做的更改對搜尋可見時才會完成(IMO將其轉換為不再有此特定請求的批次請求佇列) )。那為什麼我會收到此錯誤?
error indexing item: es_rejected_execution_exception: rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh, target allocation id: someId, primary term: 1 on EsThreadPoolExecutor [ name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1 [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708] ]
所有條目都將進入相同的索引,ankit-test
。
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem { return esutil.BulkIndexerItem{ Index: index, DocumentID: id, Body: bytes.NewReader(body), Action: "index", DocumentType: "logs", OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { fmt.Printf("error indexing item: %s\n", err.Error()) } else { fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason) } }, } } func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) { indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: e.client, Refresh: "wait_for", NumWorkers: 1, OnError: func(ctx context.Context, err error) { fmt.Printf("received onError %s\n", err.Error()) }, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %s", err) } ctx := context.Background() for _, d := range data { if err := indexer.Add(ctx, d); err != nil { fmt.Printf("error adding data to indexer: %s\n", err) } } if err := indexer.Close(ctx); err != nil { fmt.Printf("error flushing and closing indexer: %s\n", err) } indexerStats := indexer.Stats() return &indexerStats, nil }
假設沒有其他行程以任何方式與索引互動。
使用多個 ES 文檔,我能夠找到上述問題的解決方案。下面的回答是基於我的理解。如果您發現可以改進/修正的內容,請發表評論。
這是請求生命週期:
我的問題是我使用 refresh = false
(預設)發送請求。相反,應該使用 refresh = wait_for
。為什麼?刷新提供了3種模式:
refresh = true
便宜(就伺服器負載而言),但仍確保在發送迴響應之前請求已完成。請求已從節點佇列中刪除。 所有資料都被重定向到同一節點,並且由於 refresh = false
,在現有請求從佇列中清除之前返回了回應,這導致了溢出。
以上是ES批次請求`es_rejected_execution_exception`的詳細內容。更多資訊請關注PHP中文網其他相關文章!