Heim >Backend-Entwicklung >Golang >Externes Zusammenführungsproblem – Vollständiger Leitfaden für Gophers

Externes Zusammenführungsproblem – Vollständiger Leitfaden für Gophers

Susan Sarandon
Susan SarandonOriginal
2025-01-12 08:09:42375Durchsuche

Das externe Sortierproblem ist ein bekanntes Thema in Informatikkursen und wird häufig als Lehrmittel eingesetzt. Es kommt jedoch selten vor, dass jemand jemanden trifft, der tatsächlich eine Lösung für dieses Problem in Code für ein bestimmtes technisches Szenario implementiert hat, geschweige denn die erforderlichen Optimierungen in Angriff genommen hat. Die Begegnung mit dieser Herausforderung während eines Hackathons hat mich dazu inspiriert, diesen Artikel zu schreiben.

So, hier ist die Hackathon-Aufgabe:

Sie haben eine einfache Textdatei mit IPv4-Adressen. Eine Zeile ist eine Adresse, Zeile für Zeile:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

Die Datei hat eine unbegrenzte Größe und kann Dutzende und Hunderte von Gigabyte belegen.

Sie sollten die Anzahl der eindeutigen Adressen in dieser Datei mit möglichst wenig Speicher und Zeit berechnen. Es gibt einen „naiven“ Algorithmus zur Lösung dieses Problems (Zeile für Zeile lesen, Zeilen in HashSet einfügen). Es ist besser, wenn Ihre Implementierung komplizierter und schneller ist als dieser naive Algorithmus.

Eine 120-GB-Datei mit 8 Milliarden Zeilen wurde zum Parsen eingereicht.

Es gab keine besonderen Anforderungen an die Geschwindigkeit der Programmausführung. Nachdem ich jedoch die verfügbaren Informationen zu diesem Thema online schnell überprüft hatte, kam ich zu dem Schluss, dass eine akzeptable Ausführungszeit für Standardhardware (z. B. einen Heim-PC) etwa eine Stunde oder weniger betragen würde.

Aus offensichtlichen Gründen kann die Datei nicht vollständig gelesen und verarbeitet werden, es sei denn, das System verfügt über mindestens 128 GB Speicher. Aber ist das Arbeiten mit Chunks und das Zusammenführen unvermeidlich?

Wenn Sie mit der Implementierung einer externen Zusammenführung nicht vertraut sind, empfehle ich Ihnen, sich zunächst mit einer alternativen Lösung vertraut zu machen, die akzeptabel, wenn auch alles andere als optimal ist.

Idee

  • Erstellen Sie eine 2^32-Bit-Bitmap. Dies ist ein uint64-Array, da uint64 64 Bits enthält.

  • Für jede IP:

  1. Parsen Sie die String-Adresse in vier Oktette: A.B.C.D.
  2. Übersetzen Sie es in eine Zahl ipNum = (A << 24) | (B << 16) | (C << 8) | D.
  3. Setzen Sie das entsprechende Bit in der Bitmap.
  • 1. Nachdem Sie alle Adressen gelesen haben, gehen Sie die Bitmap durch und zählen Sie die Anzahl der gesetzten Bits.

Vorteile:
Sehr schnelle Eindeutigkeitserkennung: Setzen des Bits O(1), keine Überprüfung nötig, einfach setzen.

Kein Aufwand für Hashing, Sortieren usw.
Nachteile:
Riesiger Speicherverbrauch (512 MB für den gesamten IPv4-Speicherplatz, ohne Berücksichtigung des Overheads).

Wenn die Datei riesig, aber kleiner als der gesamte IPv4-Speicherplatz ist, kann dies zwar zeitlich vorteilhaft sein, aber hinsichtlich des Speichers nicht immer sinnvoll.

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}

Dieser Ansatz ist unkompliziert und zuverlässig und daher eine praktikable Option, wenn keine Alternativen verfügbar sind. In einer Produktionsumgebung – insbesondere wenn eine optimale Leistung angestrebt wird – ist es jedoch unerlässlich, eine effizientere Lösung zu entwickeln.

Daher umfasst unser Ansatz Chunking, interne Zusammenführungssortierung und Deduplizierung.

Das Prinzip der Parallelisierung bei der externen Sortierung

  1. Chunks lesen und umwandeln:

Die Datei ist in relativ kleine Teile (Blöcke) aufgeteilt, beispielsweise ein paar hundert Megabyte oder ein paar Gigabyte. Für jeden Block:

  • Eine Goroutine (oder ein Pool von Goroutinen) wird gestartet, die den Block liest, die IP-Adressen in Zahlen zerlegt und sie in einem temporären Array im Speicher speichert.

  • Dann wird dieses Array sortiert (z. B. mit dem Standard sort.Slice) und das Ergebnis wird nach dem Entfernen von Duplikaten in eine temporäre Datei geschrieben.

Da jeder Teil unabhängig verarbeitet werden kann, können Sie mehrere solcher Handler parallel ausführen, wenn Sie über mehrere CPU-Kerne und ausreichend Festplattenbandbreite verfügen. Dadurch können Sie Ressourcen möglichst effizient nutzen.

  1. Sortierte Blöcke zusammenführen (Zusammenführungsschritt):

Sobald alle Blöcke sortiert und in temporäre Dateien geschrieben sind, müssen Sie diese sortierten Listen in einem einzigen sortierten Stream zusammenführen und Duplikate entfernen:

  • Ähnlich wie beim externen Sortiervorgang können Sie die Zusammenführung parallelisieren, indem Sie mehrere temporäre Dateien in Gruppen aufteilen, diese parallel zusammenführen und die Anzahl der Dateien schrittweise reduzieren.

  • Dadurch bleibt ein großer sortierter und deduplizierter Ausgabestrom übrig, aus dem Sie die Gesamtzahl der eindeutigen IPs berechnen können.

Vorteile der Parallelisierung:

  • Verwendung mehrerer CPU-Kerne:
    Das Single-Thread-Sortieren eines sehr großen Arrays kann langsam sein, aber wenn Sie über einen Multi-Core-Prozessor verfügen, können Sie mehrere Blöcke parallel sortieren, was den Prozess um ein Vielfaches beschleunigt.

  • Lastausgleich:

Wenn die Stückgrößen mit Bedacht gewählt werden, kann jedes Stück in etwa der gleichen Zeit verarbeitet werden. Wenn einige Chunks größer/kleiner oder komplexer sind, können Sie ihre Verarbeitung dynamisch auf verschiedene Goroutinen verteilen.

  • IO-Optimierung:

Parallelisierung ermöglicht das Lesen eines Blocks, während ein anderer sortiert oder geschrieben wird, wodurch Leerlaufzeiten reduziert werden.

Fazit

Externe Sortierung eignet sich natürlich für die Parallelisierung durch Datei-Chunking. Dieser Ansatz ermöglicht die effiziente Nutzung von Multi-Core-Prozessoren und minimiert IO-Engpässe, was zu einer deutlich schnelleren Sortierung und Deduplizierung im Vergleich zu einem Single-Thread-Ansatz führt. Durch die effektive Verteilung der Arbeitslast können Sie auch bei der Verarbeitung großer Datenmengen eine hohe Leistung erzielen.

Wichtige Überlegung:

Während wir die Datei Zeile für Zeile lesen, können wir auch die Gesamtzahl der Zeilen zählen. Während des Prozesses führen wir die Deduplizierung in zwei Schritten durch: zuerst beim Chunking und dann beim Zusammenführen. Daher ist es nicht erforderlich, die Zeilen in der endgültigen Ausgabedatei zu zählen. Stattdessen kann die Gesamtzahl der eindeutigen Zeilen wie folgt berechnet werden:

finalCount := totalLines - (DeletedInChunks DeletedInMerge)

Dieser Ansatz vermeidet redundante Vorgänge und macht die Berechnung effizienter, indem die Löschungen in jeder Phase der Deduplizierung verfolgt werden. Das spart uns mehrere Minuten.

Noch etwas:

Da bei großen Datenmengen jeder kleine Leistungsgewinn von Bedeutung ist, empfehle ich die Verwendung eines selbstgeschriebenen beschleunigten Analogons von strings.Slice()

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

Zusätzlich wurde eine Worker-Vorlage zur Verwaltung der Parallelverarbeitung übernommen, wobei die Anzahl der Threads konfigurierbar ist. Standardmäßig ist die Anzahl der Threads auf runtime.NumCPU() eingestellt, sodass das Programm alle verfügbaren CPU-Kerne effizient nutzen kann. Dieser Ansatz gewährleistet eine optimale Ressourcennutzung und bietet gleichzeitig die Flexibilität, die Anzahl der Threads basierend auf den spezifischen Anforderungen oder Einschränkungen der Umgebung anzupassen.

Wichtiger Hinweis: Bei der Verwendung von Multithreading ist es wichtig, gemeinsam genutzte Daten zu schützen, um Race Conditions zu verhindern und die Korrektheit des Programms sicherzustellen. Dies kann durch die Verwendung von Synchronisierungsmechanismen wie Mutexes, Kanälen (in Go) oder anderen nebenläufigkeitssicheren Techniken erreicht werden, abhängig von den spezifischen Anforderungen Ihrer Implementierung.

Bisherige Zusammenfassung

Die Umsetzung dieser Ideen führte zu Code, der bei Ausführung auf einem Ryzen 7700-Prozessor gepaart mit einer M.2-SSD die Aufgabe in etwa 40 Minuten erledigte.

Berücksichtigung der Komprimierung.

Die nächste Überlegung, basierend auf dem Datenvolumen und damit dem Vorhandensein erheblicher Festplattenvorgänge, war die Verwendung von Komprimierung. Zur Komprimierung wurde der Brotli-Algorithmus gewählt. Sein hohes Komprimierungsverhältnis und die effiziente Dekomprimierung machen es zu einer geeigneten Wahl, um den Festplatten-IO-Overhead zu reduzieren und gleichzeitig eine gute Leistung während der Zwischenspeicherung und -verarbeitung aufrechtzuerhalten.

Hier das Beispiel für das Aufteilen mit Brotli:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}

Ergebnisse der Verwendung der Komprimierung

Die Wirksamkeit der Komprimierung ist umstritten und hängt stark von den Bedingungen ab, unter denen die Lösung verwendet wird. Eine hohe Komprimierung reduziert die Speicherplatznutzung, erhöht jedoch proportional die Gesamtausführungszeit. Auf langsamen Festplatten kann die Komprimierung die Geschwindigkeit erheblich steigern, da die Festplatten-E/A zum Engpass wird. Umgekehrt kann die Komprimierung auf schnellen SSDs zu langsameren Ausführungszeiten führen.

In Tests, die auf einem System mit M.2-SSDs durchgeführt wurden, zeigte die Komprimierung keine Leistungsverbesserung. Aus diesem Grund habe ich mich letztendlich dazu entschieden, darauf zu verzichten. Wenn Sie jedoch bereit sind, das Risiko einzugehen, dass Ihr Code komplexer wird und möglicherweise seine Lesbarkeit beeinträchtigt, können Sie die Komprimierung als optionale Funktion implementieren, die durch ein konfigurierbares Flag gesteuert wird.

Was als nächstes zu tun ist

Im Streben nach weiterer Optimierung richten wir unser Augenmerk auf die binäre Transformation unserer Lösung. Sobald die textbasierten IP-Adressen in numerische Hashes umgewandelt wurden, können alle nachfolgenden Vorgänge im Binärformat ausgeführt werden.

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

Vorteile des Binärformats

  • Kompaktheit:

Jede Zahl nimmt eine feste Größe ein (z. B. uint32 = 4 Bytes).
Bei 1 Million IP-Adressen beträgt die Dateigröße nur ~4 MB.

  • Schnelle Bearbeitung:

Es ist nicht erforderlich, Zeichenfolgen zu analysieren, was die Lese- und Schreibvorgänge beschleunigt.

  • Plattformübergreifende Kompatibilität:

Durch die Verwendung einer konsistenten Bytereihenfolge (entweder LittleEndian oder BigEndian) können Dateien auf verschiedenen Plattformen gelesen werden.

Fazit
Das Speichern von Daten im Binärformat ist eine effizientere Methode zum Schreiben und Lesen von Zahlen. Konvertieren Sie für eine vollständige Optimierung sowohl den Datenschreib- als auch den Lesevorgang in das Binärformat. Verwenden Sie „binary.Write“ zum Schreiben und „binary.Read“ zum Lesen.

So könnte die Funktion „processChunk“ aussehen, um mit dem Binärformat zu arbeiten:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}




WTF?! Es wurde viel langsamer!!

Im Binärformat funktionierte es langsamer. Eine Datei mit 100 Millionen Zeilen (IP-Adressen) wird in binärer Form in 4,5 Minuten verarbeitet, im Vergleich zu 25 Sekunden im Text. Bei gleicher Blockgröße und gleicher Anzahl an Arbeitern. Warum?

Das Arbeiten mit dem Binärformat ist möglicherweise langsamer als das Textformat
Die Verwendung des Binärformats kann aufgrund der Besonderheiten der Funktionsweise von Binary.Read und Binary.Write sowie potenzieller Ineffizienzen bei ihrer Implementierung manchmal langsamer sein als das Textformat. Hier sind die Hauptgründe, warum dies passieren könnte:

E/A-Operationen

  • Textformat:

Funktioniert mit größeren Datenblöcken mit bufio.Scanner, der für das Lesen von Zeilen optimiert ist.
Liest ganze Zeilen und analysiert sie, was bei kleinen Konvertierungsvorgängen effizienter sein kann.

  • Binärformat:

binary.Read liest jeweils 4 Bytes, was zu häufigeren kleinen E/A-Vorgängen führt.
Häufige Aufrufe von „binary.Read“ erhöhen den Overhead durch den Wechsel zwischen Benutzer- und Systembereich.

Lösung:Verwenden Sie einen Puffer, um mehrere Zahlen gleichzeitig zu lesen.

func fastSplit(s string) []string {
    n := 1
    c := DelimiterByte

    for i := 0; i < len(s); i++ {
        if s[i] == c {
            n++
        }
    }

    out := make([]string, n)
    count := 0
    begin := 0
    length := len(s) - 1

    for i := 0; i <= length; i++ {
        if s[i] == c {
            out[count] = s[begin:i]
            count++
            begin = i + 1
        }
    }
    out[count] = s[begin : length+1]

    return out
}

Warum verbessert Pufferung die Leistung?

  • Weniger E/A-Vorgänge:
    Anstatt jede Zahl direkt auf die Festplatte zu schreiben, werden die Daten in einem Puffer gesammelt und in größeren Blöcken geschrieben.

  • Reduzierter Overhead:

Jeder Datenträgerschreibvorgang verursacht Overhead aufgrund des Kontextwechsels zwischen dem Prozess und dem Betriebssystem. Durch die Pufferung wird die Anzahl solcher Anrufe reduziert.

Wir präsentieren auch den Code für die binäre Mehrphasenzusammenführung:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

Das Ergebnis ist fantastisch: 14 Minuten für eine 110-GB-Datei mit 8 Milliarden Zeilen!

Image description

Das ist ein herausragendes Ergebnis! Die Verarbeitung einer 110-GB-Datei mit 8 Milliarden Zeilen in 14 Minuten ist in der Tat beeindruckend. Es demonstriert die Kraft von:

  • Gepufferte E/A:

Durch die Verarbeitung großer Datenmengen im Speicher statt Zeile für Zeile oder Wert für Wert reduzieren Sie die Anzahl der E/A-Vorgänge, die häufig den Engpass darstellen, drastisch.

  • Optimierte Binärverarbeitung:

Der Wechsel zum binären Lesen und Schreiben minimiert den Parsing-Aufwand, reduziert die Größe der Zwischendaten und verbessert die Speichereffizienz.

  • Effiziente Deduplizierung:

Der Einsatz speichereffizienter Algorithmen zur Deduplizierung und Sortierung stellt sicher, dass CPU-Zyklen effektiv genutzt werden.

  • Parallelität:

Durch die Nutzung von Goroutinen und Kanälen zur Parallelisierung der Arbeitslast aller Mitarbeiter wird die CPU- und Festplattenauslastung ausgeglichen.

Abschluss

Zum Abschluss finden Sie hier den vollständigen Code für die endgültige Lösung. Nutzen Sie es gerne und passen Sie es an Ihre Bedürfnisse an!

Externe Merge-Lösung für Gophers

Viel Glück!

Das obige ist der detaillierte Inhalt vonExternes Zusammenführungsproblem – Vollständiger Leitfaden für Gophers. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn