ホームページ >バックエンド開発 >Golang >外部マージ問題 - Gopher の完全ガイド

外部マージ問題 - Gopher の完全ガイド

Susan Sarandon
Susan Sarandonオリジナル
2025-01-12 08:09:42409ブラウズ

外部並べ替え問題は、コンピューター サイエンスのコースではよく知られたトピックであり、教育ツールとしてよく使用されます。ただし、必要な最適化に取り組むことはおろか、特定の技術シナリオのコードでこの問題の解決策を実際に実装した人に出会うことはほとんどありません。ハッカソン中にこの課題に遭遇したことが、この記事を書くきっかけになりました。

それでは、ハッカソンのタスクは次のとおりです:

IPv4 アドレスを含む単純なテキスト ファイルがあります。 1 行が 1 つのアドレスで、行ごとに次のようになります:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

ファイルのサイズは無制限で、数十ギガバイト、数百ギガバイトを占有する可能性があります。

このファイル内の固有のアドレスの数は、できるだけ少ないメモリと時間を使用して計算する必要があります。この問題を解決するための「単純な」アルゴリズムがあります (行ごとに読み取り、行を HashSet に入力します)。この単純なアルゴリズムよりも実装が複雑で高速な方が良いです。

80 億行を含む 120 GB のファイルが解析のために送信されました。

プログラムの実行速度に関する特別な要件はありませんでした。ただし、このトピックに関するオンラインで入手可能な情報を迅速に検討した結果、標準的なハードウェア (家庭用 PC など) で許容できる実行時間は約 1 時間以下であると結論付けました。

明らかな理由により、システムに少なくとも 128 GB の利用可能なメモリがなければ、ファイル全体を読み取って処理することはできません。しかし、チャンクの操作とマージは避けられないのでしょうか?

外部マージの実装に慣れていない場合は、最適とは程遠いものの、許容できる代替ソリューションに慣れることをお勧めします。

アイデア

  • 2^32 ビットのビットマップを作成します。 uint64 には 64 ビットが含まれるため、これは uint64 配列です。

  • 各 IP:

  1. 文字列アドレスを 4 オクテットに解析します: A.B.C.D.
  2. それを数値に変換します ipNum = (A << 24) | (B << 16) | (C << 8) | D.
  3. ビットマップ内の対応するビットを設定します。
  • 1.すべてのアドレスを読み取った後、ビットマップを実行し、設定されたビットの数をカウントします。

長所:
非常に高速な一意性検出: ビット O(1) を設定します。チェックする必要はなく、設定するだけです。

ハッシュ、ソートなどのオーバーヘッドはありません
短所:
大量のメモリ消費 (オーバーヘッドを考慮しない場合、IPv4 スペース全体で 512 MB)。

ファイルが大きくても、IPv4 スペース全体よりも小さい場合、時間の点では有利になる可能性がありますが、メモリの点では必ずしも合理的であるとは限りません。

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}

このアプローチは簡単で信頼性が高く、代替手段が利用できない場合に実行可能な選択肢になります。ただし、運用環境では、特に最適なパフォーマンスの達成を目指す場合には、より効率的なソリューションを開発することが不可欠です。

したがって、私たちのアプローチには、チャンク化、内部マージソート、および重複排除が含まれます。

外部ソートにおける並列化の原理

  1. チャンクの読み取りと変換:

ファイルは、数百メガバイトまたは数ギガバイトなどの比較的小さな部分 (チャンク) に分割されます。各チャンクについて:

  • ゴルーチン (またはゴルーチンのプール) が起動され、チャンクが読み取られ、IP アドレスが数値に解析され、メモリ内の一時配列に格納されます。

  • 次に、この配列は (たとえば、標準の sort.Slice を使用して) ソートされ、重複を削除した結果が一時ファイルに書き込まれます。

各部分は独立して処理できるため、複数の CPU コアと十分なディスク帯域幅がある場合は、このようなハンドラーを複数並行して実行できます。これにより、リソースを可能な限り効率的に使用できるようになります。

  1. ソートされたチャンクをマージします (マージステップ):

すべてのチャンクがソートされて一時ファイルに書き込まれたら、これらのソートされたリストを 1 つのソートされたストリームにマージし、重複を削除する必要があります。

  • 外部の並べ替えプロセスと同様に、複数の一時ファイルをグループに分割し、それらを並行してマージし、徐々にファイル数を減らすことで、マージを並列化できます。

  • これにより、ソートおよび重複排除された 1 つの大きな出力ストリームが残り、そこから一意の IP の総数を計算できます。

並列化の利点:

  • 複数の CPU コアの使用:
    非常に大きな配列のシングルスレッドソートは遅くなる可能性がありますが、マルチコアプロセッサを使用している場合は、複数のチャンクを並行してソートできるため、プロセスが数倍高速化されます。

  • 負荷分散:

チャンク サイズが賢明に選択されている場合、各チャンクはほぼ同じ時間で処理できます。一部のチャンクが大きい/小さい場合、またはより複雑な場合は、その処理を異なるゴルーチン間で動的に分散できます。

  • IO の最適化:

並列化により、1 つのチャンクを読み取っている間に別のチャンクをソートまたは書き込みできるため、アイドル時間が削減されます。

結論

外部ソートは、ファイルのチャンク化による並列化に自然に役立ちます。このアプローチにより、マルチコア プロセッサの効率的な使用が可能になり、IO ボトルネックが最小限に抑えられるため、シングルスレッド アプローチと比較して、ソートと重複排除が大幅に高速化されます。ワークロードを効果的に分散することで、大規模なデータセットを処理する場合でも高いパフォーマンスを実現できます。

重要な考慮事項:

ファイルを 1 行ずつ読み取るときに、合計行数をカウントすることもできます。プロセス中、重複排除は 2 つの段階で実行されます。最初はチャンク中に、次にマージ中にです。その結果、最終出力ファイルの行数を数える必要がなくなります。代わりに、一意の行の合計数は次のように計算できます。

finalCount := totalLines - (DeletedInChunks DeletedInMerge)

このアプローチでは、重複排除の各段階で削除を追跡することで、冗長な操作を回避し、計算をより効率的にします。これにより、数分を節約できます。

もう一つ:

膨大な量のデータでは、わずかなパフォーマンスの向上も重要であるため、string.Slice() の高速化された独自の類似物を使用することをお勧めします

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

さらに、並列処理を管理するためにワーカー テンプレートが採用され、スレッド数が構成可能になりました。デフォルトでは、スレッド数は runtime.NumCPU() に設定されており、プログラムは利用可能なすべての CPU コアを効率的に利用できます。このアプローチにより、リソースの最適な使用が確保されると同時に、環境の特定の要件や制限に基づいてスレッド数を調整する柔軟性も提供されます。

重要な注意事項: マルチスレッドを使用する場合、競合状態を防止し、プログラムの正確性を確保するために共有データを保護することが重要です。これは、実装の特定の要件に応じて、ミューテックス、チャネル (Go の場合)、またはその他の同時実行安全な手法などの同期メカニズムを使用することで実現できます。

これまでのまとめ

これらのアイデアを実装した結果、M.2 SSD と組み合わせた Ryzen 7700 プロセッサーでコードを実行すると、タスクが約 40 分で完了しました。

圧縮を考慮しています。

データ量とそれに伴う大規模なディスク操作の存在に基づいて、次に考慮すべき点は、圧縮の使用でした。圧縮には Brotli アルゴリズムが選択されました。高い圧縮率と効率的な解凍により、中間ストレージおよび処理中に良好なパフォーマンスを維持しながら、ディスク IO オーバーヘッドを削減するのに適した選択肢となります。

Brotli を使用したチャンク化の例を次に示します。

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}

圧縮を使用した結果

圧縮の有効性については議論の余地があり、ソリューションが使用される条件に大きく依存します。圧縮率を高くすると、ディスク容量の使用量が減りますが、それに比例して全体の実行時間も長くなります。遅い HDD では、ディスク I/O がボトルネックになるため、圧縮によって速度が大幅に向上します。逆に、高速 SSD では、圧縮により実行時間が遅くなる可能性があります。

M.2 SSD を搭載したシステムで実施したテストでは、圧縮によるパフォーマンスの向上は見られませんでした。結果として、最終的には諦めることにしました。ただし、コードが複雑になり、可読性が低下する可能性があるリスクを冒しても構わない場合は、構成可能なフラグによって制御されるオプション機能として圧縮を実装することもできます。

次に何をすべきか

さらなる最適化を追求するために、ソリューションのバイナリ変換に注目します。テキストベースの IP アドレスが数値ハッシュに変換されると、その後のすべての操作はバイナリ形式で実行できます。

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

バイナリ形式の利点

  • コンパクトさ:

各数値は固定サイズを占めます (例: uint32 = 4 バイト)。
100 万の IP アドレスの場合、ファイル サイズはわずか約 4 MB になります。

  • 高速処理:

文字列を解析する必要がないため、読み取りおよび書き込み操作が高速化されます。

  • クロスプラットフォームの互換性:

一貫したバイト順序 (LittleEndian または BigEndian) を使用することにより、異なるプラットフォーム間でファイルを読み取ることができます。

結論
データをバイナリ形式で保存することは、数値の書き込みと読み取りを行うためのより効率的な方法です。完全に最適化するには、データの書き込みプロセスと読み取りプロセスの両方をバイナリ形式に変換します。書き込みには binary.Write を使用し、読み取りには binary.Read を使用します。

バイナリ形式で動作する processChunk 関数は次のようになります。

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}




なんと?!かなり遅くなりました!!

バイナリ形式では動作が遅くなりました。 1 億行 (IP アドレス) を持つファイルは、テキスト形式では 25 秒かかるのに対し、バイナリ形式では 4.5 分で処理されます。チャンク サイズとワーカー数は同じです。なぜ?

バイナリ形式での作業はテキスト形式よりも遅くなる可能性があります
バイナリ形式を使用すると、binary.Read および binary.Write の動作方法の詳細と、実装における潜在的な非効率性により、テキスト形式よりも遅くなる場合があります。これが起こる主な理由は次のとおりです:

I/O 操作

  • テキスト形式:

行の読み取り用に最適化された bufio.Scanner を使用して、より大きなデータ ブロックを処理します。
行全体を読み取り、解析します。これは、小規模な変換操作の場合により効率的です。

  • バイナリ形式:

binary.Read は一度に 4 バイトを読み取るため、小規模な I/O 操作がより頻繁に行われます。
binary.Read を頻繁に呼び出すと、ユーザー空間とシステム空間の間の切り替えによるオーバーヘッドが増加します。

解決策: バッファを使用して複数の数値を一度に読み取ります。

func fastSplit(s string) []string {
    n := 1
    c := DelimiterByte

    for i := 0; i < len(s); i++ {
        if s[i] == c {
            n++
        }
    }

    out := make([]string, n)
    count := 0
    begin := 0
    length := len(s) - 1

    for i := 0; i <= length; i++ {
        if s[i] == c {
            out[count] = s[begin:i]
            count++
            begin = i + 1
        }
    }
    out[count] = s[begin : length+1]

    return out
}

バッファリングによりパフォーマンスが向上するのはなぜですか?

  • I/O 操作の削減:
    各数値をディスクに直接書き込む代わりに、データはバッファーに蓄積され、より大きなブロックに書き込まれます。

  • オーバーヘッドの削減:

各ディスク書き込み操作では、プロセスとオペレーティング システム間のコンテキストの切り替えによりオーバーヘッドが発生します。バッファリングにより、このような呼び出しの数が減ります。

バイナリ多相マージのコードも紹介します:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

結果は素晴らしく、80 億行の 110Gb ファイルで 14 分でした!

Image description

素晴らしい結果ですね! 80 億行を含む 110 GB のファイルを 14 分で処理するというのは、実に驚異的です。それは次の能力を実証します:

  • バッファリングされた I/O:

行ごとまたは値ごとではなく、メモリ内の大きなデータ チャンクを処理することにより、ボトルネックとなることが多い I/O 操作の数が大幅に削減されます。

  • 最適化されたバイナリ処理:

バイナリの読み取りと書き込みに切り替えると、解析のオーバーヘッドが最小限に抑えられ、中間データのサイズが削減され、メモリ効率が向上します。

  • 効率的な重複排除:

重複排除と並べ替えにメモリ効率の高いアルゴリズムを使用することで、CPU サイクルが効果的に利用されます。

  • 並列処理:

ゴルーチンとチャネルを利用してワーカー間でワークロードを並列化することで、CPU とディスクの使用率のバランスをとります。

結論

最後に、最終的なソリューションの完全なコードを次に示します。自由に使用して、ニーズに合わせて調整してください!

Gopher 用の外部マージ ソリューション

頑張ってください!

以上が外部マージ問題 - Gopher の完全ガイドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。