Rumah >pembangunan bahagian belakang >Golang >Permintaan kelompok ES `es_rejected_execution_exception`
Editor PHP Apple memperkenalkan anda kepada masalah biasa dalam permintaan kelompok ES: `es_rejected_execution_exception`. Apabila menggunakan Elasticsearch untuk membuat permintaan kelompok, anda kadangkala menghadapi pengecualian ini. Pengecualian ini biasanya menunjukkan bahawa bilangan permintaan serentak melebihi kapasiti pemprosesan pelayan Elasticsearch, menyebabkan permintaan itu ditolak pelaksanaan. Artikel ini akan menganalisis punca pengecualian ini dan memberi anda penyelesaian untuk membantu anda menangani masalah dengan lancar.
Saya mempunyai sekeping kira-kira 5M entri (untuk kesederhanaan, anggap setiap entri adalah sekeping bait, ia menggunakan fungsi getIndexerItem
函数映射到索引器项),我将其平均分配给 200 个 go 例程。然后每个go例程调用push
dan panjang hirisan ialah 5M/200.
Menurut pemahaman saya Refresh的理解:wait_for
, setiap kali permintaan dibuat kepada anjal, ia hanya akan diselesaikan apabila perubahan yang dibuat oleh permintaan itu dapat dilihat oleh carian (IMO mengubahnya menjadi baris gilir permintaan kelompok yang tidak lagi mempunyai permintaan khusus ini). Jadi mengapa saya mendapat ralat ini?
error indexing item: es_rejected_execution_exception: rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh, target allocation id: someId, primary term: 1 on EsThreadPoolExecutor [ name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1 [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708] ]
Semua penyertaan akan masuk ke dalam indeks yang sama, ankit-test
.
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem { return esutil.BulkIndexerItem{ Index: index, DocumentID: id, Body: bytes.NewReader(body), Action: "index", DocumentType: "logs", OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { fmt.Printf("error indexing item: %s\n", err.Error()) } else { fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason) } }, } } func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) { indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: e.client, Refresh: "wait_for", NumWorkers: 1, OnError: func(ctx context.Context, err error) { fmt.Printf("received onError %s\n", err.Error()) }, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %s", err) } ctx := context.Background() for _, d := range data { if err := indexer.Add(ctx, d); err != nil { fmt.Printf("error adding data to indexer: %s\n", err) } } if err := indexer.Close(ctx); err != nil { fmt.Printf("error flushing and closing indexer: %s\n", err) } indexerStats := indexer.Stats() return &indexerStats, nil }
Andaikan tiada proses lain berinteraksi dengan indeks dalam apa jua cara.
Menggunakan berbilang dokumen ES, saya dapat mencari penyelesaian kepada masalah di atas. Jawapan di bawah adalah berdasarkan pemahaman saya. Jika anda melihat sesuatu yang boleh diperbaiki/dibetulkan, sila tinggalkan komen.
Ini ialah kitaran hayat permintaan:
Masalah saya ialah saya menggunakan refresh = false
(默认)发送请求。相反,应该使用 refresh = wait_for
. kenapa? Muat semula menawarkan 3 mod:
refresh = true
(dari segi beban pelayan), tetapi masih memastikan permintaan itu selesai sebelum menghantar semula respons. Permintaan telah dialih keluar daripada baris gilir nod. Semua data telah diubah hala ke nod yang sama dan disebabkan refresh = false
respons telah dikembalikan sebelum permintaan sedia ada dikosongkan daripada baris gilir, yang menyebabkan limpahan.
Atas ialah kandungan terperinci Permintaan kelompok ES `es_rejected_execution_exception`. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!