Home  >  Article  >  Backend Development  >  ES batch request `es_rejected_execution_exception`

ES batch request `es_rejected_execution_exception`

WBOY
WBOYforward
2024-02-11 21:50:07941browse

ES batch request `es_rejected_execution_exception`

php editor Apple introduces you to common problems in ES batch requests: `es_rejected_execution_exception`. When using Elasticsearch to make batch requests, you sometimes encounter this exception. This exception usually indicates that the number of concurrent requests exceeds the processing capacity of the Elasticsearch server, causing the request to be refused execution. This article will analyze the cause of this exception and give you a solution to help you handle the problem smoothly.

Question content

I have a slice of about 5M entries (for simplicity, assume each entry is a byte slice, which is mapped to using the getIndexerItem function indexer item), I divided it evenly among the 200 go routines. Then each go routine calls the push function, and the slice length is 5M/200.

From my understanding of Refresh:wait_for, whenever a request is made to elastic, it will only be completed when the changes made by that request are visible to search (IMO that translates to no longer Bulk request queue for this specific request)). So why am I getting this error?

error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: 
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
    name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
    [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]

All entries will go into the same index, ankit-test.

func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
    return esutil.BulkIndexerItem{
        Index:        index,
        DocumentID:   id,
        Body:         bytes.NewReader(body),
        Action:       "index",
        DocumentType: "logs",
        OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
            if err != nil {
                fmt.Printf("error indexing item: %s\n", err.Error())
            } else {
                fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason)
            }
        },
    }
}

func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
    indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Client: e.client,
        Refresh: "wait_for",
        NumWorkers: 1,
        OnError: func(ctx context.Context, err error) {
            fmt.Printf("received onError %s\n", err.Error())
        },
    })
    if err != nil {
        return nil, fmt.Errorf("error creating bulk indexer: %s", err)
    }

    ctx := context.Background()
    for _, d := range data {
        if err := indexer.Add(ctx, d); err != nil {
            fmt.Printf("error adding data to indexer: %s\n", err)
        }
    }
    if err := indexer.Close(ctx); err != nil {
        fmt.Printf("error flushing and closing indexer: %s\n", err)
    }

    indexerStats := indexer.Stats()
    return &indexerStats, nil
}

Assume no other processes interact with the index in any way.

Solution

Using multiple ES documents, I was able to find a solution to the above problem. The answer below is based on my understanding. If you see something that could be improved/corrected, please leave a comment.

This is the request life cycle:

  1. The golang es client merges multiple requests into one and sends them to the server in a single batch request. A single batch request can contain documents destined for multiple indexes and shards.
  2. When a batch request arrives at a node in the cluster (also called a coordination node), it is placed into the batch queue as a whole and processed by threads in the batch thread pool.
  3. The coordinating node splits the batch request according to the shards to which the document needs to be routed. Each batch subrequest is forwarded to the data node holding the corresponding primary shard. Bulk subrequests are queued in the node's bulk queue. If there is no free space on the queue, the coordinating node is notified that the bulk subrequest has been rejected.
  4. Once all subrequests have completed or been rejected, a response is created and returned to the client. It is possible, even likely, that only some of the documents in the batch request were rejected.

My problem is that I send the request using refresh = false (default). Instead, use refresh = wait_for . Why? Refresh provides 3 modes:

  1. false: Do not perform refresh-related operations. The changes made by this request will be visible at some point after the request returns. The request does not have to be completed by the time the response is received. The request may still be in the node's queue.
  2. true: Refresh the relevant primary shards and replica shards immediately after the operation occurs. Make sure the request is complete before sending back a response. The request has been removed from the node queue.
  3. wait_for: Wait for changes made by the request to be visible via refresh before replying. Unlike true, this does not force an immediate refresh, but rather waits for the refresh to occur. Cheaper (in terms of server load) than refresh = true but still ensures that the request is completed before sending back a response. The request has been removed from the node queue.

All data was redirected to the same node, and because refresh = false, the response was returned before the existing request was cleared from the queue, which caused the overflow.

The above is the detailed content of ES batch request `es_rejected_execution_exception`. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete