Maison >développement back-end >Golang >Demande par lots ES `es_rejected_execution_exception`
Éditeur PHP Apple vous présente les problèmes courants dans les requêtes batch ES : `es_rejected_execution_exception`. Lorsque vous utilisez Elasticsearch pour effectuer des requêtes par lots, vous rencontrez parfois cette exception. Cette exception indique généralement que le nombre de requêtes simultanées dépasse la capacité de traitement du serveur Elasticsearch, entraînant le refus d'exécution de la requête. Cet article analysera la cause de cette exception et vous proposera une solution pour vous aider à gérer le problème en douceur.
J'ai une tranche d'environ 5 M d'entrées (pour plus de simplicité, supposons que chaque entrée est une tranche d'octets, elle utilise la fonction getIndexerItem
函数映射到索引器项),我将其平均分配给 200 个 go 例程。然后每个go例程调用push
et la longueur de la tranche est de 5 M/200.
D'après ma compréhension Refresh的理解:wait_for
, chaque fois qu'une demande est faite à Elastic, elle ne sera complétée que lorsque les modifications apportées par cette demande seront visibles par la recherche (l'OMI transformant cela en une file d'attente de demandes par lots qui n'a plus cette demande spécifique). Alors pourquoi est-ce que j'obtiens cette erreur ?
error indexing item: es_rejected_execution_exception: rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh, target allocation id: someId, primary term: 1 on EsThreadPoolExecutor [ name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1 [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708] ]
Toutes les entrées iront dans le même index, ankit-test
.
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem { return esutil.BulkIndexerItem{ Index: index, DocumentID: id, Body: bytes.NewReader(body), Action: "index", DocumentType: "logs", OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { fmt.Printf("error indexing item: %s\n", err.Error()) } else { fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason) } }, } } func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) { indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: e.client, Refresh: "wait_for", NumWorkers: 1, OnError: func(ctx context.Context, err error) { fmt.Printf("received onError %s\n", err.Error()) }, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %s", err) } ctx := context.Background() for _, d := range data { if err := indexer.Add(ctx, d); err != nil { fmt.Printf("error adding data to indexer: %s\n", err) } } if err := indexer.Close(ctx); err != nil { fmt.Printf("error flushing and closing indexer: %s\n", err) } indexerStats := indexer.Stats() return &indexerStats, nil }
Supposons qu’aucun autre processus n’interagisse avec l’index de quelque manière que ce soit.
En utilisant plusieurs documents ES, j'ai pu trouver une solution au problème ci-dessus. La réponse ci-dessous est basée sur ma compréhension. Si vous voyez quelque chose qui pourrait être amélioré/corrigé, veuillez laisser un commentaire.
Voici le cycle de vie de la demande :
Mon problème est que j'utilise refresh = false
(默认)发送请求。相反,应该使用 refresh = wait_for
. Pourquoi? Actualiser propose 3 modes :
refresh = true
(en termes de charge du serveur), mais garantit toujours que la demande est terminée avant de renvoyer une réponse. La requête a été supprimée de la file d'attente des nœuds. Toutes les données ont été redirigées vers le même nœud et, grâce à refresh = false
, la réponse a été renvoyée avant que la requête existante ne soit effacée de la file d'attente, ce qui a provoqué le débordement.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!