Maison >développement back-end >Golang >Demande par lots ES `es_rejected_execution_exception`

Demande par lots ES `es_rejected_execution_exception`

WBOY
WBOYavant
2024-02-11 21:50:071009parcourir

Demande par lots ES `es_rejected_execution_exception`

Éditeur PHP Apple vous présente les problèmes courants dans les requêtes batch ES : `es_rejected_execution_exception`. Lorsque vous utilisez Elasticsearch pour effectuer des requêtes par lots, vous rencontrez parfois cette exception. Cette exception indique généralement que le nombre de requêtes simultanées dépasse la capacité de traitement du serveur Elasticsearch, entraînant le refus d'exécution de la requête. Cet article analysera la cause de cette exception et vous proposera une solution pour vous aider à gérer le problème en douceur.

Contenu de la question

J'ai une tranche d'environ 5 M d'entrées (pour plus de simplicité, supposons que chaque entrée est une tranche d'octets, elle utilise la fonction getIndexerItem 函数映射到索引器项),我将其平均分配给 200 个 go 例程。然后每个go例程调用push et la longueur de la tranche est de 5 M/200.

D'après ma compréhension Refresh的理解:wait_for, chaque fois qu'une demande est faite à Elastic, elle ne sera complétée que lorsque les modifications apportées par cette demande seront visibles par la recherche (l'OMI transformant cela en une file d'attente de demandes par lots qui n'a plus cette demande spécifique). Alors pourquoi est-ce que j'obtiens cette erreur ?

error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: 
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
    name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
    [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]

Toutes les entrées iront dans le même index, ankit-test.

func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
    return esutil.BulkIndexerItem{
        Index:        index,
        DocumentID:   id,
        Body:         bytes.NewReader(body),
        Action:       "index",
        DocumentType: "logs",
        OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
            if err != nil {
                fmt.Printf("error indexing item: %s\n", err.Error())
            } else {
                fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason)
            }
        },
    }
}

func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
    indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Client: e.client,
        Refresh: "wait_for",
        NumWorkers: 1,
        OnError: func(ctx context.Context, err error) {
            fmt.Printf("received onError %s\n", err.Error())
        },
    })
    if err != nil {
        return nil, fmt.Errorf("error creating bulk indexer: %s", err)
    }

    ctx := context.Background()
    for _, d := range data {
        if err := indexer.Add(ctx, d); err != nil {
            fmt.Printf("error adding data to indexer: %s\n", err)
        }
    }
    if err := indexer.Close(ctx); err != nil {
        fmt.Printf("error flushing and closing indexer: %s\n", err)
    }

    indexerStats := indexer.Stats()
    return &indexerStats, nil
}

Supposons qu’aucun autre processus n’interagisse avec l’index de quelque manière que ce soit.

Solution

En utilisant plusieurs documents ES, j'ai pu trouver une solution au problème ci-dessus. La réponse ci-dessous est basée sur ma compréhension. Si vous voyez quelque chose qui pourrait être amélioré/corrigé, veuillez laisser un commentaire.

Voici le cycle de vie de la demande :

  1. golang es client fusionne plusieurs requêtes en une seule et les envoie au serveur en une seule requête par lots. Une seule demande par lots peut contenir des documents destinés à plusieurs index et partitions.
  2. Lorsqu'une requête par lots arrive sur un nœud du cluster (également appelé nœud coordinateur), elle est placée dans son ensemble dans la file d'attente par lots et traitée par les threads du pool de threads par lots.
  3. Le nœud de coordination divise la demande par lots en fonction de la partition vers laquelle le document doit être acheminé. Chaque sous-requête par lots est transmise au nœud de données contenant la partition principale correspondante. Les sous-requêtes groupées sont mises en file d'attente dans la file d'attente groupée du nœud. S'il n'y a pas d'espace libre dans la file d'attente, le nœud de coordination est informé que la sous-demande groupée a été rejetée.
  4. Une fois que toutes les sous-demandes ont été complétées ou rejetées, une réponse est créée et renvoyée au client. Il est possible, voire probable, que seuls certains documents de la demande groupée aient été rejetés.

Mon problème est que j'utilise refresh = false (默认)发送请求。相反,应该使用 refresh = wait_for . Pourquoi? Actualiser propose 3 modes :

  1. false : n'effectuez pas d'opérations liées à l'actualisation. Les modifications apportées par cette requête seront visibles à un moment donné après le retour de la requête. Il n’est pas nécessaire que la demande soit complétée au moment où la réponse est reçue. La requête peut toujours être dans la file d'attente du nœud.
  2. true : actualisez les fragments principaux et les fragments de réplique pertinents immédiatement après l'opération. Assurez-vous que la demande est complète avant de renvoyer une réponse. La requête a été supprimée de la file d'attente des nœuds.
  3. wait_for : Attendez que les modifications apportées par la demande soient visibles via l'actualisation avant de répondre. Contrairement à vrai, cela ne force pas une actualisation immédiate, mais attend plutôt que l'actualisation se produise. Moins cher que refresh = true (en termes de charge du serveur), mais garantit toujours que la demande est terminée avant de renvoyer une réponse. La requête a été supprimée de la file d'attente des nœuds.

Toutes les données ont été redirigées vers le même nœud et, grâce à refresh = false, la réponse a été renvoyée avant que la requête existante ne soit effacée de la file d'attente, ce qui a provoqué le débordement.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer