Heim >Backend-Entwicklung >Golang >ES-Batch-Anfrage „es_rejected_execution_Exception'.

ES-Batch-Anfrage „es_rejected_execution_Exception'.

WBOY
WBOYnach vorne
2024-02-11 21:50:071044Durchsuche

ES-Batch-Anfrage „es_rejected_execution_Exception.

PHP-Editor Apple stellt Ihnen häufige Probleme bei ES-Batch-Anfragen vor: „es_rejected_execution_Exception“. Wenn Sie Elasticsearch zum Erstellen von Batch-Anfragen verwenden, tritt diese Ausnahme manchmal auf. Diese Ausnahme weist normalerweise darauf hin, dass die Anzahl der gleichzeitigen Anforderungen die Verarbeitungskapazität des Elasticsearch-Servers überschreitet, was dazu führt, dass die Ausführung der Anforderung verweigert wird. Dieser Artikel analysiert die Ursache dieser Ausnahme und bietet Ihnen eine Lösung, die Ihnen hilft, das Problem reibungslos zu lösen.

Frageninhalt

Ich habe ein Segment mit etwa 5 Millionen Einträgen (der Einfachheit halber nehmen wir an, dass jeder Eintrag ein Byte-Slice ist, die Funktion getIndexerItem 函数映射到索引器项),我将其平均分配给 200 个 go 例程。然后每个go例程调用push verwendet und die Slice-Länge 5 M/200 beträgt.

Nach meinem Verständnis Refresh的理解:wait_for wird eine Anfrage an Elastic erst dann abgeschlossen, wenn die durch diese Anfrage vorgenommenen Änderungen für die Suche sichtbar sind (IMO verwandelt dies in eine Batch-Anfragewarteschlange, die diese spezifische Anfrage nicht mehr enthält). Warum erhalte ich diesen Fehler?

error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: 
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
    name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
    [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]

Alle Einträge werden in denselben Index aufgenommen, ankit-test.

func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
    return esutil.BulkIndexerItem{
        Index:        index,
        DocumentID:   id,
        Body:         bytes.NewReader(body),
        Action:       "index",
        DocumentType: "logs",
        OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
            if err != nil {
                fmt.Printf("error indexing item: %s\n", err.Error())
            } else {
                fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason)
            }
        },
    }
}

func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
    indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Client: e.client,
        Refresh: "wait_for",
        NumWorkers: 1,
        OnError: func(ctx context.Context, err error) {
            fmt.Printf("received onError %s\n", err.Error())
        },
    })
    if err != nil {
        return nil, fmt.Errorf("error creating bulk indexer: %s", err)
    }

    ctx := context.Background()
    for _, d := range data {
        if err := indexer.Add(ctx, d); err != nil {
            fmt.Printf("error adding data to indexer: %s\n", err)
        }
    }
    if err := indexer.Close(ctx); err != nil {
        fmt.Printf("error flushing and closing indexer: %s\n", err)
    }

    indexerStats := indexer.Stats()
    return &indexerStats, nil
}

Gehen Sie davon aus, dass keine anderen Prozesse in irgendeiner Weise mit dem Index interagieren.

Lösung

Durch die Verwendung mehrerer ES-Dokumente konnte ich eine Lösung für das oben genannte Problem finden. Die folgende Antwort basiert auf meinem Verständnis. Wenn Sie etwas sehen, das verbessert/korrigiert werden könnte, hinterlassen Sie bitte einen Kommentar.

Dies ist der Anforderungslebenszyklus:

    Der
  1. golang es-Client führt mehrere Anfragen zu einer zusammen und sendet sie in einer einzigen Batch-Anfrage an den Server. Eine einzelne Batch-Anfrage kann Dokumente enthalten, die für mehrere Indizes und Shards bestimmt sind.
  2. Wenn eine Batch-Anfrage bei einem Knoten im Cluster (auch Koordinatorknoten genannt) eintrifft, wird sie als Ganzes in die Batch-Warteschlange gestellt und von Threads im Batch-Thread-Pool verarbeitet.
  3. Der koordinierende Knoten teilt die Stapelanforderung basierend auf dem Shard auf, an den das Dokument weitergeleitet werden muss. Jede Batch-Unteranforderung wird an den Datenknoten weitergeleitet, der den entsprechenden primären Shard enthält. Massenunteranfragen werden in der Massenwarteschlange des Knotens eingereiht. Wenn in der Warteschlange kein freier Speicherplatz vorhanden ist, wird der koordinierende Knoten benachrichtigt, dass die Massenunteranforderung abgelehnt wurde.
  4. Sobald alle Unteranfragen abgeschlossen oder abgelehnt wurden, wird eine Antwort erstellt und an den Kunden zurückgesendet. Es ist möglich, sogar wahrscheinlich, dass nur einige der Dokumente in der Sammelanforderung abgelehnt wurden.

Mein Problem ist, dass ich refresh = false (默认)发送请求。相反,应该使用 refresh = wait_for verwende. Warum? Refresh bietet 3 Modi:

  1. false: Führen Sie keine aktualisierungsbezogenen Vorgänge aus. Die durch diese Anfrage vorgenommenen Änderungen werden irgendwann nach der Rückkehr der Anfrage sichtbar sein. Die Anfrage muss bis zum Eingang der Antwort noch nicht abgeschlossen sein. Die Anfrage befindet sich möglicherweise noch in der Warteschlange des Knotens.
  2. true: Aktualisieren Sie die relevanten primären Shards und Replikat-Shards unmittelbar nach dem Vorgang. Stellen Sie sicher, dass die Anfrage vollständig ist, bevor Sie eine Antwort zurücksenden. Die Anfrage wurde aus der Knotenwarteschlange entfernt.
  3. wait_for: Warten Sie, bis die durch die Anfrage vorgenommenen Änderungen per Aktualisierung sichtbar sind, bevor Sie antworten. Im Gegensatz zu „true“ erzwingt dies keine sofortige Aktualisierung, sondern wartet, bis die Aktualisierung erfolgt. Günstiger als refresh = true (im Hinblick auf die Serverlast), stellt aber dennoch sicher, dass die Anfrage abgeschlossen wird, bevor eine Antwort zurückgesendet wird. Die Anfrage wurde aus der Knotenwarteschlange entfernt.

Alle Daten wurden an denselben Knoten umgeleitet und aufgrund von refresh = false wurde die Antwort zurückgegeben, bevor die vorhandene Anforderung aus der Warteschlange gelöscht wurde, was den Überlauf verursachte.

Das obige ist der detaillierte Inhalt vonES-Batch-Anfrage „es_rejected_execution_Exception'.. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen