首頁  >  文章  >  後端開發  >  解決 Go 中的十億行挑戰(從 到 s)

解決 Go 中的十億行挑戰(從 到 s)

王林
王林原創
2024-08-05 20:04:32581瀏覽

Resolvendo o desafio de um bilhão de linhas em Go (de para s)

不久前,一位朋友告訴我一個挑戰,涉及讀取 10 億行文件。我發現這個想法很有趣,但由於當時是大學考試週,所以我最終把它留到以後再看。幾個月後,我看到了 Theo 拍攝的有關挑戰的視頻,並決定仔細觀察。

十億行挑戰賽的目標是計算一系列城市的最低、最高和平均溫度- 具體來說,這個列表中有10 億個項目,其中每個項目由一個城市的名稱組成和溫度,每個城市都可以出現多次。最後,程式必須按城市名稱的字母順序顯示這些值。

我認為嘗試解決挑戰會很有趣,即使沒有獎勵。無論如何,我寫了這篇文字來描述我的過程。

第一次嘗試:使程式碼工作

每當我需要解決更複雜的問題時,我的首要目標就是讓程式運作。它可能不是最快或最乾淨的程式碼,但它是有效的程式碼。

基本上,我創建了位置結構來表示列表中的每個城市,包含最低和最高溫度、溫度總和以及城市在列表中出現的次數(最後兩個是計算平均值所必需的) 。我知道有一種方法可以直接計算平均值,而無需儲存溫度數及其總和。但正如我之前提到的,目標是讓程式碼正常運作。

資料清單由城市名稱後面跟著溫度組成,並以分號分隔。例如:

Antananarivo;15.6
Iqaluit;-20.7
Dolisie;25.8
Kuopio;-6.8

讀取資料最簡單的方法是使用 Scan,它允許您一次讀取一行。透過該行,您可以將其分為兩部分:分號之前和之後的值。要取得溫度,您可以使用 strconv.ParseFloat,它將字串轉換為浮點數。第一次實現的完整程式碼如下:

package main

import (
    "bufio"
    "fmt"
    "math"
    "os"
    "sort"
    "strconv"
    "strings"
)

type Location struct {
    min   float64
    max   float64
    sum   float64
    count int
}

func NewLocation() *Location {
    return &Location{
        min:   math.MaxInt16,
        max:   math.MinInt16,
        sum:   0,
        count: 0,
    }
}

func (loc *Location) Add(temp float64) {
    if temp < loc.min {
        loc.min = temp
    } else if temp > loc.max {
        loc.max = temp
    }

    loc.sum += temp
    loc.count += 1
}

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")

func main() {
    flag.Parse()
    if *cpuprofile != "" {
        f, err := os.Create(*cpuprofile)
        if err != nil {
            log.Fatal(err)
        }
        pprof.StartCPUProfile(f)
        defer pprof.StopCPUProfile()
    }

    file, _ := os.Open("./measurements.txt")
    defer file.Close()

    m := map[string]*Location{}

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := scanner.Text()
        name, tempStr, _ := strings.Cut(line, ";")
        temp, _ := strconv.ParseFloat(tempStr, 32)

        loc, ok := m[name]
        if !ok {
            loc = NewLocation()
            m[name] = loc
        }
        loc.Add(temp)
    }

    keys := make([]string, 0, len(m))
    for k := range m {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    for _, name := range keys {
        loc := m[name]
        mean := loc.sum / float64(loc.count)
        fmt.Printf("%s: %.1f/%.1f/%.1f\n", name, loc.min, mean, loc.max)
    }
}

這個更簡單的版本運行大約需要 97 秒。

優化字串到浮點數的轉換

分析執行設定文件,我意識到最大的瓶頸之一是 strconv.ParseFloat 函數。基本上,它的總執行時間是 23 秒(約佔總時間的 23%)。

這個函數的問題是它是通用的,也就是說,它可以與任何有效的浮點數一起使用。然而,我們的數據具有非常特定的溫度格式。請參閱下面的範例:

Antananarivo;15.6
Iqaluit;-20.7
Dolisie;5.8
Kuopio;-6.8

溫度格式始終相同:點前一位或兩位數字,點後一位數字,並且可能在開頭包含一個減號。因此,我們可以建立一個以特定方式轉換值的函數,從而最佳化流程,而無需執行 ParseFloat 的所有通用檢查。

func bytesToTemp(b []byte) float64 {
    var v int16
    var isNeg int16 = 1

    for i := 0; i < len(b)-1; i++ {
        char := b[i]
        if char == '-' {
            isNeg = -1
        } else if char == '.' {
            digit := int16(b[i+1] - '0')
            v = v*10 + digit
        } else {
            digit := int16(char - '0')
            v = v*10 + digit
        }
    }

    return float64(v*isNeg) / 10
}

為了以位元組格式而不是字串讀取數據,我將掃描器的返回從字串更改為位元組

line := scanner.Bytes()
before, after, _ := bytes.Cut(line, []byte{';'})
name := string(before)
temp := bytesToTemp(after)

這些小改動將執行時間縮短至 75 秒。

讀取更大的資料區塊

使用Scan的最大優點是程式不需要一次將整個檔案載入到記憶體中。相反,它可以讓您逐行讀取,以效能換取記憶體。

要注意的是,一次讀取一行和一次載入 14 GB 資料之間存在折衷。這個中間立場是讀取區塊,即記憶體片段。這樣,我們就可以讀取 128 MB 的區塊,而不是一次讀取整個檔案。

buf := make([]byte, chunkSize)
reader := bufio.NewReader(file)
var leftData []byte
for {
    n, err := reader.Read(buf)
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }

    chunk := append(leftData, buf[:n]...)
    lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
    leftData = chunk[lastIndex+1:]

    lines := bytes.Split(chunk[:lastIndex], []byte{'\n'})

    for _, line := range lines {
        before, after, _ := bytes.Cut(line, []byte{';'})
        name := string(before)
        temp := bytesToTemp(after)

        loc, ok := m[name]
        if !ok {
            loc = NewLocation()
            m[name] = loc
        }
        loc.Add(temp)
    }
}

結果,執行時間下降到了 70 秒。比以前好多了,但仍有進步的空間。

變更資料類型

事實上,整個挑戰都圍繞著帶有小數位的數字。然而,處理浮點始終是一個巨大的挑戰(請參閱 IEEE-754)。既然如此,為什麼不用整數來表示溫度呢?

type Location struct {
    min   int16
    max   int16
    sum   int32
    count int32
}

如之前所定義,溫度始終由最多三位數字表示。因此,除去逗號,值可以在-999和999之間變化,因此int16足以表示它們。對於求和和計數,int32 綽綽有餘,因為該類型的變化範圍為 -2147483648 到 2147483647。

Dado que agora esperamos um valor inteiro de 16 bits para a temperatura, precisamos modificar a função bytesToTemp. Para isso, mudamos o retorno para int16 e removemos a divisão no final. Assim, a função vai sempre vai retornar um número inteiro.

func bytesToTemp(b []byte) int16 {
    var v int16
    var isNeg int16 = 1

    for i := 0; i < len(b)-1; i++ {
        char := b[i]
        if char == '-' {
            isNeg = -1
        } else if char == '.' {
            digit := int16(b[i+1] - '0')
            v = v*10 + digit
        } else {
            digit := int16(char - '0')
            v = v*10 + digit
        }
    }

    return v * isNeg
}

Para finalizar, modifiquei a função Add para aceitar os valores inteiros e ajustei o print para dividir os valores antes de mostrá-los na tela. Com isso, o tempo caiu três segundos, indo para 60 segundos. Não é muito, mas uma vitória é uma vitória.

Melhorando a Performance da Conversão de Bytes para String

Novamente analisando o profile, vi que tinha uma certa função chamada slicebytetostring que custava 13,5 segundos de tempo de execução. Analisando, descobri que essa função é a responsável por converter um conjunto de bytes em uma string (o próprio nome da função deixa claro isso). No caso, essa é a função chamada internamente quando se usa a função string(bytes).

Em Go, assim como na maioria das linguagens, strings são imutáveis, o que significa que não podem ser modificadas após serem criadas (normalmente, quando se faz isso, uma nova string é criada). Por outro lado, listas são mutáveis. Ou seja, quando se faz uma conversão de uma lista de bytes para string, é preciso criar uma cópia da lista para garantir que a string não mude se a lista mudar.

Para evitar o custo adicional de alocação de memória nessas conversões, podemos utilizar a biblioteca unsafe para realizar a conversão de bytes para string (Nota: ela é chamada de unsafe por um motivo).

name := unsafe.String(unsafe.SliceData(before), len(before))

Diferente do caso anterior, a função acima reutiliza os bytes passados para gerar a string. O problema disso é que, se a lista original mudar, a string resultante também será afetada. Embora possamos garantir que isso não ocorrerá neste contexto específico, em aplicações maiores e mais complexas, o uso de unsafe pode se tornar bem inseguro.

Com essa mudança, reduzimos o tempo de execução para 51 segundos. Nada mal.

Reimplementando bytes.Cut

Lembra que eu mencionei que as temperaturas sempre tinham formatos específicos? Então, segundo o profile da execução, que separa a linha em duas partes (nome da cidade e temperatura), custa 5.38 segundos para rodar. E refizermos ela na mão?

Para separar os dois valores, precisamos encontrar onde está o ";". Como a gente já sabe, os valores da temperatura podem ter entre três e cinco caracteres. Assim, precisamos verificar se o caractere anterior aos dígitos é um ";". Simples, não?

idx := 0
if line[len(line)-4] == ';' {
    idx = len(line) - 4
} else if line[len(line)-5] == ';' {
    idx = len(line) - 5
} else {
    idx = len(line) - 6
}

before := line[:idx]
after := line[idx+1:]

Com isso, o tempo de execução foi para 46 segundos, cerca de 5 segundos a menos que antes (quem diria, não é?).

Paralelismo para acelerar o processamento

Todo esse tempo, nosso objetivo foi tornar o código o mais rápido possível em um núcleo. Mudando coisas aqui e ali, diminuímos o tempo de 97 segundos para 46 segundos. Claro, ainda daria para melhorar o tempo sem ter que lidar com paralelismo, mas a vida é curta demais para se preocupar com isso, não é?

Para poder rodar o código em vários núcleos, decidi usar a estrutura de canais nativa do Go. Além disso, também criei um grupo de espera que vai indicar quando o processamento dos dados terminaram.

Vale destacar que workers, nesse caso, é uma constante que define quantas goroutines serão criadas para processar os dados. No meu caso, são 12, visto que eu tenho um processador com 6 núcleos e 12 threads.

chunkChan := make(chan []byte, workers)
var wg sync.WaitGroup
wg.Add(workers)

O próximo passo foi criar as goroutines que serão responsável por receber os dados do canal e processá-los. A lógica de processamento dos dados é semelhante ao modelo single thread.

for i := 0; i < workers; i++ {
    go func() {
        for chunk := range chunkChan {
            lines := bytes.Split(chunk, []byte{'\n'})
            for _, line := range lines {
                before, after := parseLine(line)
                name := unsafe.String(unsafe.SliceData(before), len(before))
                temp := bytesToTemp(after)

                loc, ok := m[name]
                if !ok {
                    loc = NewLocation()
                    m[name] = loc
                }
                loc.Add(temp)
            }
        }
        wg.Done()
    }()
}

Por fim, o código responsável por ler os dados do disco e enviá-los ao canal:

for {
    n, err := reader.Read(buf)
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }

    chunk := append(leftData, buf[:n]...)
    lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
    leftData = chunk[lastIndex+1:]
    chunkChan <- chunk[:lastIndex]
}
close(chunkChan)
wg.Wait()

Vale ressaltar que os mapas em Go não são thread-safe. Isso significa que acessar ou alterar dados no mesmo mapa de forma concorrente pode levar a problemas de consistência ou erros. Embora não tenha observado problemas durante meus testes, vale a pena tratar esse problema.

Uma das maneiras de resolver esse problema seria criar um mecanismo de trava para o mapa, permitindo que somente uma goroutine consiga utilizá-lo por vez. Isso, claro, poderia tornar a execução um pouco mais lenta.

A segunda alternativa consiste em criar um mapa para cada uma das goroutines, de modo que não vai existir concorrência entre elas. Por fim, os mapas são colocados em um novo canal e os valores do mapa principal calculados a partir deles. Essa solução ainda vai ter um custo, mas vai ser menor que a anterior.

close(chunkChan)

go func() {
    wg.Wait()
    close(mapChan)
}()

keys := make([]string, 0, 825)

m := map[string]*Location{}
for lm := range mapChan {
    for lk, lLoc := range lm {
        loc, ok := m[lk]
        if !ok {
            keys = append(keys, lk)
            m[lk] = lLoc
            continue
        }

        if lLoc.min < loc.min {
            loc.min = lLoc.min
        }
        if lLoc.max > loc.max {
            loc.max = lLoc.max
        }
        loc.sum += lLoc.sum
        loc.count += lLoc.count
    }
}

Além disso, como o processamento passou a ser distribuído entre diferentes núcleos, diminui o tamanho do chunk de 128 MB para 2 MB. Cheguei nesse número testando vários valores, tendo entre 1 MB e 5 MB os melhores resultando. Na média, 2 MB obteve o melhor desempenho.

Enfim, o nosso tempo de processamento caiu de 46 segundos para... 12 segundos.

Otimizando a quebra de linhas no chunk

Todas as vezes que eu analisava o profile, a função bytes.Split chamava a minha atenção. O tempo de execução dela era de 16 segundos (tempo total, considerando todas as goroutines), o que parece justo, visto que ela é responsável por quebrar um chunk em linhas. No entanto, parecia um trabalho dobrado, dado que ela primeiro quebra as linhas para, em seguida, as linhas serem lidas uma a uma. Por que não fazer ambos ao mesmo tempo?

Minha abordagem foi percorrer o chunk e verificar se o byte atual correspondia a um \n. Dessa forma, consegui percorrer todas as linhas ao mesmo tempo em que as quebrava, processando em seguida.

start := 0
start := 0
for end, b := range chunk {
    if b != '\n' {
        continue
    }
    before, after := parseLine(chunk[start:end])
    // ...
    start = end + 1
}

Com essa simples mudança, o tempo de execução caiu para aproximadamente 9 segundos.

Executed in    8.45 secs    fish           external
   usr time   58.47 secs  152.00 micros   58.47 secs
   sys time    4.52 secs  136.00 micros    4.52 secs

Próximos passos

Atualmente, o maior gargalo da aplicação é o mapa. Somando todas as operações de leitura e escrita, são 32 segundos (de longe, o tempo mais alto). Talvez criar um algoritmo de hash mais eficiente resolva? Fica como ideia para o futuro.

No mais, conseguimos diminuir o tempo de 1 minuto e 40 segundos para quase 8 segundos, sem usar qualquer biblioteca externa. Além disso, tentando fazer a aplicação ficar cada vez mais rápida, me fez aprender muita coisa.

以上是解決 Go 中的十億行挑戰(從 到 s)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
上一篇:Go 的嬰兒學步下一篇:Go 的嬰兒學步