Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Menyelesaikan cabaran bilion talian dalam Go (dari hingga s)

Menyelesaikan cabaran bilion talian dalam Go (dari hingga s)

王林
王林asal
2024-08-05 20:04:32507semak imbas

Resolvendo o desafio de um bilhão de linhas em Go (de para s)

Sebentar tadi, seorang rakan memberitahu saya tentang cabaran yang melibatkan membaca fail dengan 1 bilion baris. Saya mendapati idea itu sangat menarik, tetapi kerana ia adalah minggu peperiksaan di kolej, saya akhirnya meninggalkannya untuk melihat kemudian. Beberapa bulan kemudian, saya melihat video oleh Theo tentang cabaran itu dan memutuskan untuk melihat dengan lebih dekat.

Objektif Cabaran Satu Bilion Baris adalah untuk mengira suhu minimum, maksimum dan purata bagi senarai bandar - butirannya ialah terdapat 1 bilion item dalam senarai ini, di mana setiap item terdiri daripada nama bandar dan suhu, Setiap bandar boleh muncul lebih daripada sekali. Akhir sekali, program mesti memaparkan nilai ini dalam susunan abjad mengikut nama bandar.

Saya fikir ia akan menjadi menyeronokkan untuk mencuba menyelesaikan cabaran, walaupun tidak ada ganjaran. Bagaimanapun, saya menulis teks ini menerangkan proses saya.

Percubaan pertama: membuat kod berfungsi

Setiap kali saya perlu menyelesaikan masalah yang lebih rumit, matlamat pertama saya adalah untuk menjadikan program ini berfungsi. Ia mungkin bukan kod terpantas atau paling bersih, tetapi kod itu yang berfungsi.

Pada asasnya, saya mencipta struktur Lokasi untuk mewakili setiap bandar dalam senarai, yang mengandungi suhu minimum dan maksimum, jumlah suhu dan bilangan kali bandar muncul dalam senarai (dua yang terakhir ini diperlukan untuk mengira purata) . Saya tahu ada cara untuk mengira purata secara langsung, tanpa perlu menyimpan bilangan suhu dan jumlahnya. Tetapi seperti yang saya nyatakan sebelum ini, matlamatnya adalah untuk menjadikan kod itu berfungsi.

Senarai data terdiri daripada nama bandar diikuti dengan suhu, dipisahkan oleh koma bertitik. Contohnya:

Antananarivo;15.6
Iqaluit;-20.7
Dolisie;25.8
Kuopio;-6.8

Cara paling mudah untuk membaca data ialah menggunakan Imbas, yang membolehkan anda membaca satu baris pada satu masa. Dengan garis, anda boleh membahagikannya kepada dua bahagian: nilai sebelum dan selepas koma bertitik. Untuk mendapatkan suhu, anda boleh menggunakan strconv.ParseFloat, yang menukar rentetan kepada apungan. Kod lengkap untuk pelaksanaan pertama boleh dilihat di bawah:

package main

import (
    "bufio"
    "fmt"
    "math"
    "os"
    "sort"
    "strconv"
    "strings"
)

type Location struct {
    min   float64
    max   float64
    sum   float64
    count int
}

func NewLocation() *Location {
    return &Location{
        min:   math.MaxInt16,
        max:   math.MinInt16,
        sum:   0,
        count: 0,
    }
}

func (loc *Location) Add(temp float64) {
    if temp < loc.min {
        loc.min = temp
    } else if temp > loc.max {
        loc.max = temp
    }

    loc.sum += temp
    loc.count += 1
}

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")

func main() {
    flag.Parse()
    if *cpuprofile != "" {
        f, err := os.Create(*cpuprofile)
        if err != nil {
            log.Fatal(err)
        }
        pprof.StartCPUProfile(f)
        defer pprof.StopCPUProfile()
    }

    file, _ := os.Open("./measurements.txt")
    defer file.Close()

    m := map[string]*Location{}

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := scanner.Text()
        name, tempStr, _ := strings.Cut(line, ";")
        temp, _ := strconv.ParseFloat(tempStr, 32)

        loc, ok := m[name]
        if !ok {
            loc = NewLocation()
            m[name] = loc
        }
        loc.Add(temp)
    }

    keys := make([]string, 0, len(m))
    for k := range m {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    for _, name := range keys {
        loc := m[name]
        mean := loc.sum / float64(loc.count)
        fmt.Printf("%s: %.1f/%.1f/%.1f\n", name, loc.min, mean, loc.max)
    }
}

Versi yang lebih ringkas ini mengambil masa kira-kira 97 saat untuk dijalankan.

Mengoptimumkan penukaran rentetan kepada terapung

Menganalisis profil pelaksanaan, saya menyedari bahawa salah satu kesesakan terbesar ialah fungsi strconv.ParseFloat. Pada asasnya, jumlah masa pelaksanaannya ialah 23 saat (kira-kira 23% daripada jumlah masa).

Masalah dengan fungsi ini ialah ia adalah generik, iaitu, ia dibuat untuk berfungsi dengan mana-mana apungan yang sah. Walau bagaimanapun, data kami mempunyai format suhu yang sangat khusus. Lihat contoh di bawah:

Antananarivo;15.6
Iqaluit;-20.7
Dolisie;5.8
Kuopio;-6.8

Format suhu sentiasa sama: satu atau dua digit sebelum titik dan satu digit selepas titik, dan mungkin termasuk tanda tolak pada permulaan. Oleh itu, kita boleh mencipta fungsi yang menukar nilai dengan cara tertentu, mengoptimumkan proses tanpa perlu melakukan semua semakan generik ParseFloat.

func bytesToTemp(b []byte) float64 {
    var v int16
    var isNeg int16 = 1

    for i := 0; i < len(b)-1; i++ {
        char := b[i]
        if char == '-' {
            isNeg = -1
        } else if char == '.' {
            digit := int16(b[i+1] - '0')
            v = v*10 + digit
        } else {
            digit := int16(char - '0')
            v = v*10 + digit
        }
    }

    return float64(v*isNeg) / 10
}

Untuk membaca data dalam format bait dan bukannya rentetan, saya menukar pengembalian Pengimbas daripada rentetan kepada bait

line := scanner.Bytes()
before, after, _ := bytes.Cut(line, []byte{';'})
name := string(before)
temp := bytesToTemp(after)

Perubahan kecil ini menurunkan masa pelaksanaan kepada 75 saat.

Membaca potongan data yang lebih besar

Kelebihan terbesar menggunakan Scan ialah program tidak perlu memuatkan keseluruhan fail sekaligus ke dalam memori. Sebaliknya, ia membolehkan anda membaca baris demi baris, prestasi dagangan untuk ingatan.

Perlu ambil perhatian bahawa terdapat kompromi antara membaca satu baris pada satu masa dan memuatkan 14 GB data sekaligus. Jalan tengah ini membaca ketulan, yang merupakan kepingan ingatan. Dengan cara ini, daripada membaca keseluruhan fail sekaligus, kita boleh membaca blok 128 MB.

buf := make([]byte, chunkSize)
reader := bufio.NewReader(file)
var leftData []byte
for {
    n, err := reader.Read(buf)
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }

    chunk := append(leftData, buf[:n]...)
    lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
    leftData = chunk[lastIndex+1:]

    lines := bytes.Split(chunk[:lastIndex], []byte{'\n'})

    for _, line := range lines {
        before, after, _ := bytes.Cut(line, []byte{';'})
        name := string(before)
        temp := bytesToTemp(after)

        loc, ok := m[name]
        if !ok {
            loc = NewLocation()
            m[name] = loc
        }
        loc.Add(temp)
    }
}

Akibatnya, masa pelaksanaan menurun kepada 70 saat. Lebih baik daripada sebelumnya, tetapi masih ada ruang untuk diperbaiki.

Menukar jenis data

Memang fakta bahawa keseluruhan cabaran berkisar pada nombor dengan tempat perpuluhan. Walau bagaimanapun, menangani mata terapung sentiasa menjadi cabaran besar (lihat IEEE-754). Dalam kes itu, mengapa tidak menggunakan integer untuk mewakili suhu?

type Location struct {
    min   int16
    max   int16
    sum   int32
    count int32
}

Seperti yang ditakrifkan sebelum ini, suhu sentiasa diwakili sehingga tiga digit. Oleh itu, mengalih keluar koma, nilai boleh berbeza antara -999 dan 999, jadi int16 sudah cukup untuk mewakilinya. Untuk menjumlahkan dan mengira, int32 adalah lebih daripada mencukupi, kerana jenis ini boleh berbeza antara -2147483648 dan 2147483647.

Dado que agora esperamos um valor inteiro de 16 bits para a temperatura, precisamos modificar a função bytesToTemp. Para isso, mudamos o retorno para int16 e removemos a divisão no final. Assim, a função vai sempre vai retornar um número inteiro.

func bytesToTemp(b []byte) int16 {
    var v int16
    var isNeg int16 = 1

    for i := 0; i < len(b)-1; i++ {
        char := b[i]
        if char == '-' {
            isNeg = -1
        } else if char == '.' {
            digit := int16(b[i+1] - '0')
            v = v*10 + digit
        } else {
            digit := int16(char - '0')
            v = v*10 + digit
        }
    }

    return v * isNeg
}

Para finalizar, modifiquei a função Add para aceitar os valores inteiros e ajustei o print para dividir os valores antes de mostrá-los na tela. Com isso, o tempo caiu três segundos, indo para 60 segundos. Não é muito, mas uma vitória é uma vitória.

Melhorando a Performance da Conversão de Bytes para String

Novamente analisando o profile, vi que tinha uma certa função chamada slicebytetostring que custava 13,5 segundos de tempo de execução. Analisando, descobri que essa função é a responsável por converter um conjunto de bytes em uma string (o próprio nome da função deixa claro isso). No caso, essa é a função chamada internamente quando se usa a função string(bytes).

Em Go, assim como na maioria das linguagens, strings são imutáveis, o que significa que não podem ser modificadas após serem criadas (normalmente, quando se faz isso, uma nova string é criada). Por outro lado, listas são mutáveis. Ou seja, quando se faz uma conversão de uma lista de bytes para string, é preciso criar uma cópia da lista para garantir que a string não mude se a lista mudar.

Para evitar o custo adicional de alocação de memória nessas conversões, podemos utilizar a biblioteca unsafe para realizar a conversão de bytes para string (Nota: ela é chamada de unsafe por um motivo).

name := unsafe.String(unsafe.SliceData(before), len(before))

Diferente do caso anterior, a função acima reutiliza os bytes passados para gerar a string. O problema disso é que, se a lista original mudar, a string resultante também será afetada. Embora possamos garantir que isso não ocorrerá neste contexto específico, em aplicações maiores e mais complexas, o uso de unsafe pode se tornar bem inseguro.

Com essa mudança, reduzimos o tempo de execução para 51 segundos. Nada mal.

Reimplementando bytes.Cut

Lembra que eu mencionei que as temperaturas sempre tinham formatos específicos? Então, segundo o profile da execução, que separa a linha em duas partes (nome da cidade e temperatura), custa 5.38 segundos para rodar. E refizermos ela na mão?

Para separar os dois valores, precisamos encontrar onde está o ";". Como a gente já sabe, os valores da temperatura podem ter entre três e cinco caracteres. Assim, precisamos verificar se o caractere anterior aos dígitos é um ";". Simples, não?

idx := 0
if line[len(line)-4] == ';' {
    idx = len(line) - 4
} else if line[len(line)-5] == ';' {
    idx = len(line) - 5
} else {
    idx = len(line) - 6
}

before := line[:idx]
after := line[idx+1:]

Com isso, o tempo de execução foi para 46 segundos, cerca de 5 segundos a menos que antes (quem diria, não é?).

Paralelismo para acelerar o processamento

Todo esse tempo, nosso objetivo foi tornar o código o mais rápido possível em um núcleo. Mudando coisas aqui e ali, diminuímos o tempo de 97 segundos para 46 segundos. Claro, ainda daria para melhorar o tempo sem ter que lidar com paralelismo, mas a vida é curta demais para se preocupar com isso, não é?

Para poder rodar o código em vários núcleos, decidi usar a estrutura de canais nativa do Go. Além disso, também criei um grupo de espera que vai indicar quando o processamento dos dados terminaram.

Vale destacar que workers, nesse caso, é uma constante que define quantas goroutines serão criadas para processar os dados. No meu caso, são 12, visto que eu tenho um processador com 6 núcleos e 12 threads.

chunkChan := make(chan []byte, workers)
var wg sync.WaitGroup
wg.Add(workers)

O próximo passo foi criar as goroutines que serão responsável por receber os dados do canal e processá-los. A lógica de processamento dos dados é semelhante ao modelo single thread.

for i := 0; i < workers; i++ {
    go func() {
        for chunk := range chunkChan {
            lines := bytes.Split(chunk, []byte{'\n'})
            for _, line := range lines {
                before, after := parseLine(line)
                name := unsafe.String(unsafe.SliceData(before), len(before))
                temp := bytesToTemp(after)

                loc, ok := m[name]
                if !ok {
                    loc = NewLocation()
                    m[name] = loc
                }
                loc.Add(temp)
            }
        }
        wg.Done()
    }()
}

Por fim, o código responsável por ler os dados do disco e enviá-los ao canal:

for {
    n, err := reader.Read(buf)
    if err != nil {
        if err == io.EOF {
            break
        }
        panic(err)
    }

    chunk := append(leftData, buf[:n]...)
    lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
    leftData = chunk[lastIndex+1:]
    chunkChan <- chunk[:lastIndex]
}
close(chunkChan)
wg.Wait()

Vale ressaltar que os mapas em Go não são thread-safe. Isso significa que acessar ou alterar dados no mesmo mapa de forma concorrente pode levar a problemas de consistência ou erros. Embora não tenha observado problemas durante meus testes, vale a pena tratar esse problema.

Uma das maneiras de resolver esse problema seria criar um mecanismo de trava para o mapa, permitindo que somente uma goroutine consiga utilizá-lo por vez. Isso, claro, poderia tornar a execução um pouco mais lenta.

A segunda alternativa consiste em criar um mapa para cada uma das goroutines, de modo que não vai existir concorrência entre elas. Por fim, os mapas são colocados em um novo canal e os valores do mapa principal calculados a partir deles. Essa solução ainda vai ter um custo, mas vai ser menor que a anterior.

close(chunkChan)

go func() {
    wg.Wait()
    close(mapChan)
}()

keys := make([]string, 0, 825)

m := map[string]*Location{}
for lm := range mapChan {
    for lk, lLoc := range lm {
        loc, ok := m[lk]
        if !ok {
            keys = append(keys, lk)
            m[lk] = lLoc
            continue
        }

        if lLoc.min < loc.min {
            loc.min = lLoc.min
        }
        if lLoc.max > loc.max {
            loc.max = lLoc.max
        }
        loc.sum += lLoc.sum
        loc.count += lLoc.count
    }
}

Além disso, como o processamento passou a ser distribuído entre diferentes núcleos, diminui o tamanho do chunk de 128 MB para 2 MB. Cheguei nesse número testando vários valores, tendo entre 1 MB e 5 MB os melhores resultando. Na média, 2 MB obteve o melhor desempenho.

Enfim, o nosso tempo de processamento caiu de 46 segundos para... 12 segundos.

Otimizando a quebra de linhas no chunk

Todas as vezes que eu analisava o profile, a função bytes.Split chamava a minha atenção. O tempo de execução dela era de 16 segundos (tempo total, considerando todas as goroutines), o que parece justo, visto que ela é responsável por quebrar um chunk em linhas. No entanto, parecia um trabalho dobrado, dado que ela primeiro quebra as linhas para, em seguida, as linhas serem lidas uma a uma. Por que não fazer ambos ao mesmo tempo?

Minha abordagem foi percorrer o chunk e verificar se o byte atual correspondia a um \n. Dessa forma, consegui percorrer todas as linhas ao mesmo tempo em que as quebrava, processando em seguida.

start := 0
start := 0
for end, b := range chunk {
    if b != '\n' {
        continue
    }
    before, after := parseLine(chunk[start:end])
    // ...
    start = end + 1
}

Com essa simples mudança, o tempo de execução caiu para aproximadamente 9 segundos.

Executed in    8.45 secs    fish           external
   usr time   58.47 secs  152.00 micros   58.47 secs
   sys time    4.52 secs  136.00 micros    4.52 secs

Próximos passos

Atualmente, o maior gargalo da aplicação é o mapa. Somando todas as operações de leitura e escrita, são 32 segundos (de longe, o tempo mais alto). Talvez criar um algoritmo de hash mais eficiente resolva? Fica como ideia para o futuro.

No mais, conseguimos diminuir o tempo de 1 minuto e 40 segundos para quase 8 segundos, sem usar qualquer biblioteca externa. Além disso, tentando fazer a aplicação ficar cada vez mais rápida, me fez aprender muita coisa.

Atas ialah kandungan terperinci Menyelesaikan cabaran bilion talian dalam Go (dari hingga s). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Artikel sebelumnya:Langkah bayi dengan GoArtikel seterusnya:Langkah bayi dengan Go