首页 >后端开发 >Golang >外部合并问题 - Gophers 完整指南

外部合并问题 - Gophers 完整指南

Susan Sarandon
Susan Sarandon原创
2025-01-12 08:09:42409浏览

外部排序问题是计算机科学课程中的一个众所周知的话题,并且经常被用作教学工具。然而,很少有人能够在特定技术场景的代码中实际实现此问题的解决方案,更不用说解决所需的优化了。在一次黑客马拉松中遇到这个挑战激发了我写这篇文章的灵感。

所以,这是黑客马拉松任务:

您有一个包含 IPv4 地址的简单文本文件。一行是一个地址,逐行:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

文件大小无限制,可以占用数十、数百GB。

您应该使用尽可能少的内存和时间来计算该文件中唯一地址的数量。有一个“天真的”算法可以解决这个问题(逐行读取,将行放入 HashSet)。如果您的实现比这个简单的算法更复杂、更快,那就更好了。

提交了一个 120GB、80 亿行的文件进行解析。

对于程序执行速度没有具体要求。然而,在快速查看有关该主题的在线可用信息后,我得出结论,标准硬件(例如家用 PC)可接受的执行时间约为一小时或更短。

由于显而易见的原因,除非系统至少有 128GB 可用内存,否则无法完整读取和处理文件。但是使用块和合并是不可避免的吗?

如果您不习惯实施外部合并,我建议您首先熟悉一个可以接受的替代解决方案,尽管远非最佳。

主意

  • 创建 2^32 位位图。这是一个 uint64 数组,因为 uint64 包含 64 位。

  • 对于每个 IP:

  1. 将字符串地址解析为四个八位字节:A.B.C.D.
  2. 将其转换为数字 ipNum = (A
  3. 设置位图中对应的位。
  • 1.读取所有地址后,遍历位图并计算设置位的数量。

优点:
非常快速的唯一性检测:设置位 O(1),无需检查,只需设置即可。

没有哈希、排序等开销
缺点:
巨大的内存消耗(整个 IPv4 空间需要 512 MB,不考虑开销)。

如果文件很大,但小于完整的 IPv4 空间,这在时间方面仍然具有优势,但在内存方面并不总是合理。

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}

这种方法简单可靠,在没有替代方案时成为可行的选择。然而,在生产环境中,尤其是当旨在实现最佳性能时,开发更高效的解决方案至关重要。

因此,我们的方法涉及分块、内部合并排序和重复数据删除。

外部排序的并行化原理

  1. 读取和转换块:

文件被分割成相对较小的部分(块),比如几百兆字节或几千兆字节。对于每个块:

  • 启动一个 goroutine(或一个 goroutine 池),它读取块,将 IP 地址解析为数字并将它们存储在内存中的临时数组中。

  • 然后对该数组进行排序(例如,使用标准 sort.Slice),并在删除重复项后将结果写入临时文件。

由于每个部分都可以独立处理,因此如果您有多个 CPU 核心和足够的磁盘带宽,您可以并行运行多个此类处理程序。这将使您能够尽可能有效地使用资源。

  1. 合并排序的块(合并步骤):

所有块都排序并写入临时文件后,您需要将这些排序列表合并到单个排序流中,删除重复项:

  • 与外部排序过程类似,可以通过将多个临时文件分组,并行合并并逐渐减少文件数量来并行合并。

  • 这会留下一个大的已排序和去重的输出流,您可以从中计算唯一 IP 的总数。

并行化的优点:

  • 使用多个CPU核心:
    对非常大的数组进行单线程排序可能会很慢,但如果您有多核处理器,则可以并行对多个块进行排序,从而将过程加快数倍。

  • 负载平衡:

如果明智地选择块大小,则可以在大约相同的时间内处理每个块。如果某些块更大/更小或更复杂,您可以在不同的 goroutine 之间动态分配它们的处理。

  • IO 优化:

并行化允许读取一个块,同时对另一个块进行排序或写入,从而减少空闲时间。

底线

外部排序自然适合通过文件分块进行并行化。这种方法可以有效利用多核处理器并最大限度地减少 IO 瓶颈,与单线程方法相比,排序和重复数据删除速度显着加快。通过有效地分配工作负载,即使在处理海量数据集时也可以获得高性能。

重要考虑因素:

在逐行读取文件的同时,我们还可以统计总行数。在此过程中,我们分两个阶段执行重复数据删除:首先是在分块期间,然后是在合并期间。因此,无需计算最终输出文件中的行数。相反,唯一行的总数可以计算为:

finalCount :=totalLines - (DeletedInChunks DeletedInMerge)

这种方法避免了冗余操作,并通过在重复数据删除的每个阶段跟踪删除操作来提高计算效率。这为我们节省了几分钟。

还有一件事:

由于任何小的性能提升对大量数据都很重要,我建议使用自行编写的字符串加速模拟。Slice()

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

此外,采用了工作模板来管理并行处理,线程数量是可配置的。默认情况下,线程数设置为 runtime.NumCPU(),允许程序有效地利用所有可用的 CPU 核心。这种方法确保了最佳的资源使用,同时还提供了根据环境的特定要求或限制来灵活调整线程数量的能力。

重要提示:使用多线程时,保护共享数据以防止竞争条件并确保程序的正确性至关重要。这可以通过使用同步机制来实现,例如互斥体、通道(在 Go 中)或其他并发安全技术,具体取决于您的实现的具体要求。

到目前为止的总结

这些想法的实现产生了代码,当在搭配 M.2 SSD 的 Ryzen 7700 处理器上执行时,可以在大约 40 分钟内完成任务。

考虑压缩。

基于数据量以及因此存在的重要磁盘操作,下一个考虑因素是压缩的使用。选择 Brotli 算法进行压缩。其高压缩比和高效解压缩使其成为减少磁盘IO开销同时在中间存储和处理过程中保持良好性能的合适选择。

这是使用 Brotli 进行分块的示例:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}

使用压缩的结果

压缩的有效性是有争议的,并且高度依赖于解决方案的使用条件。高压缩可减少磁盘空间的使用,但会成比例地增加总体执行时间。在慢速 HDD 上,压缩可以显着提高速度,因为磁盘 I/O 成为瓶颈。相反,在快速 SSD 上,压缩可能会导致执行时间变慢。

在配备 M.2 SSD 的系统上进行的测试中,压缩并未显示出性能提升。结果,我最终决定放弃它。但是,如果您愿意冒增加代码复杂性并可能降低其可读性的风险,则可以将压缩实现为可选功能,由可配置标志控制。

接下来做什么

为了追求进一步优化,我们将注意力转向解决方案的二进制转换。一旦基于文本的 IP 地址转换为数字哈希值,所有后续操作都可以以二进制格式执行。

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

二进制格式的优点

  • 紧凑性:

每个数字占用固定大小(例如,uint32 = 4 字节)。
对于 100 万个 IP 地址,文件大小仅为 ~4 MB。

  • 快速处理:

无需解析字符串,加快读写操作

  • 跨平台兼容性:

通过使用一致的字节顺序(LittleEndian 或 BigEndian),可以跨不同平台读取文件。

结论
以二进制格式存储数据是一种更有效的写入和读取数字的方法。为了完整优化,将数据写入和读取过程都转换为二进制格式。使用binary.Write进行写入,使用binary.Read进行读取。

以下是 processChunk 函数使用二进制格式时的样子:

package main

import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
    "math/bits"
)

//  Parse IP address "A.B.C.D"  => uint32 number
func ipToUint32(ipStr string) (uint32, error) {
    parts := strings.Split(ipStr, ".")
    if len(parts) != 4 {
        return 0, fmt.Errorf("invalid IP format")
    }

    var ipNum uint32
    for i := 0; i < 4; i++ {
        val, err := strconv.Atoi(parts[i])
        if err != nil || val < 0 || val > 255 {
            return 0, fmt.Errorf("invalid IP octet: %v", parts[i])
        }
        ipNum = (ipNum << 8) | uint32(val)
    }

    return ipNum, nil
}


func popcount64(x uint64) int {
    return bits.OnesCount64(x)
}

func main() {
    filePath := "ips.txt"

    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file: %v\n", err)
        return
    }
    defer file.Close()

    // IPv4 space size: 2^32 = 4,294,967,296
    // We need 2^32 bits, that is (2^32)/64 64-bit words
    totalBits := uint64(1) << 32       // 2^32
    arraySize := totalBits / 64        //how many uint64 do we need
    bitset := make([]uint64, arraySize)

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ipStr := scanner.Text()
        ipNum, err := ipToUint32(ipStr)
        if err != nil {
            fmt.Printf("Incorrect IP: %s\n", ipStr)
            continue
        }

        idx := ipNum / 64
        bit := ipNum % 64
        mask := uint64(1) << bit
        // Setting the bit
        bitset[idx] |= mask
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Error reading file: %v\n", err)
        return
    }

    count := 0
    for _, val := range bitset {
        count += bits.OnesCount64(val)
    }

    fmt.Printf("Number of unique IP addresses: %d\n", count)
}

搞什么?!速度变得慢了很多!!

在二进制格式下,工作速度变得更慢。包含 1 亿行(IP 地址)的文件以二进制形式处理需要 4.5 分钟,而文本形式则需要 25 秒。具有相同的块大小和工作人员数量。为什么?

使用二进制格式可能比文本格式慢
由于binary.Read和binary.Write操作方式的具体情况以及其实现中潜在的低效率,使用二进制格式有时可能比文本格式慢。以下是可能发生这种情况的主要原因:

I/O 操作

  • 文本格式:

使用 bufio.Scanner 处理更大的数据块,该扫描器针对读取行进行了优化。
读取整行并解析它们,这对于小型转换操作来说更加高效。

  • 二进制格式:

binary.Read 一次读取 4 个字节,导致小 I/O 操作更加频繁。
频繁调用binary.Read会增加用户空间和系统空间之间切换的开销。

解决方案:使用缓冲区一次读取多个数字。

func fastSplit(s string) []string {
    n := 1
    c := DelimiterByte

    for i := 0; i < len(s); i++ {
        if s[i] == c {
            n++
        }
    }

    out := make([]string, n)
    count := 0
    begin := 0
    length := len(s) - 1

    for i := 0; i <= length; i++ {
        if s[i] == c {
            out[count] = s[begin:i]
            count++
            begin = i + 1
        }
    }
    out[count] = s[begin : length+1]

    return out
}

为什么缓冲可以提高性能?

  • 更少的 I/O 操作:
    数据不是直接将每个数字写入磁盘,而是累积在缓冲区中并写入更大的块中。

  • 减少开销:

由于进程和操作系统之间的上下文切换,每个磁盘写入操作都会产生开销。缓冲可以减少此类调用的数量。

我们还展示了二进制多相合并的代码:

145.67.23.4
8.34.5.23
89.54.3.124
89.54.3.124
3.45.71.5
... 

结果太棒了:110Gb 文件、80 亿行仅需 14 分钟!

Image description

这真是一个了不起的成绩!在 14 分钟内处理一个包含 80 亿行的 110 GB 文件确实令人印象深刻。它展示了以下功能的力量:

  • 缓冲 I/O:

通过在内存中处理大块数据而不是逐行或逐值处理,可以大大减少 I/O 操作的数量,而 I/O 操作通常是瓶颈。

  • 优化的二进制处理:

切换为二进制读写可以最大限度地减少解析开销,减少中间数据的大小,提高内存效率。

  • 高效重复数据删除:

使用内存高效算法进行重复数据删除和排序可确保 CPU 周期得到有效利用。

  • 并行度:

利用 goroutine 和通道并行处理工作线程之间的工作负载,平衡 CPU 和磁盘利用率。

结论

最后,这是最终解决方案的完整代码。请随意使用它并根据您的需求进行调整!

Gophers 的外部合并解决方案

祝你好运!

以上是外部合并问题 - Gophers 完整指南的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn