얼마 전 한 친구가 10억 줄의 파일을 읽는 도전에 대해 이야기해주었습니다. 나는 그 아이디어가 매우 흥미롭다고 생각했지만, 대학 시험 주간이었기 때문에 결국 나중에 보려고 남겨 두었습니다. 몇 달 후, 테오의 챌린지 영상을 보고 좀 더 자세히 살펴보기로 했습니다.
10억 행 챌린지의 목적은 도시 목록의 최저, 최고 및 평균 기온을 계산하는 것입니다. 세부 사항은 이 목록에 10억 개의 항목이 있으며 각 항목은 도시 이름으로 구성된다는 것입니다. 온도, 각 도시는 두 번 이상 나타날 수 있습니다. 마지막으로 프로그램은 이러한 값을 도시 이름에 따라 알파벳 순서로 표시해야 합니다.
보상이 없더라도 문제를 해결하려고 노력하는 것이 재미있을 것이라고 생각했습니다. 어쨌든 저는 제 과정을 설명하는 글을 썼습니다.
더 복잡한 문제를 해결해야 할 때마다 첫 번째 목표는 프로그램을 작동시키는 것입니다. 가장 빠르거나 깨끗한 코드는 아닐 수도 있지만 작동하는 코드입니다.
기본적으로 저는 최저 및 최고 기온, 기온의 합, 도시가 목록에 나타나는 횟수를 포함하는 목록의 각 도시를 나타내는 위치 구조를 만들었습니다(마지막 두 개는 평균을 계산하는 데 필요합니다). . 온도 수와 그 합계를 저장하지 않고도 평균을 직접 계산할 수 있는 방법이 있다는 것을 알고 있습니다. 하지만 앞서 말했듯이 목표는 코드를 작동하게 만드는 것이었습니다.
데이터 목록은 도시 이름과 기온, 세미콜론으로 구분되어 구성됩니다. 예:
Antananarivo;15.6 Iqaluit;-20.7 Dolisie;25.8 Kuopio;-6.8
데이터를 읽는 가장 간단한 방법은 한 번에 한 줄씩 읽을 수 있는 스캔을 사용하는 것입니다. 줄을 사용하면 세미콜론 앞과 뒤의 값이라는 두 부분으로 나눌 수 있습니다. 온도를 얻으려면 문자열을 부동 소수점으로 변환하는 strconv.ParseFloat를 사용할 수 있습니다. 첫 번째 구현의 전체 코드는 아래에서 볼 수 있습니다.
package main import ( "bufio" "fmt" "math" "os" "sort" "strconv" "strings" ) type Location struct { min float64 max float64 sum float64 count int } func NewLocation() *Location { return &Location{ min: math.MaxInt16, max: math.MinInt16, sum: 0, count: 0, } } func (loc *Location) Add(temp float64) { if temp < loc.min { loc.min = temp } else if temp > loc.max { loc.max = temp } loc.sum += temp loc.count += 1 } var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") func main() { flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } file, _ := os.Open("./measurements.txt") defer file.Close() m := map[string]*Location{} scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() name, tempStr, _ := strings.Cut(line, ";") temp, _ := strconv.ParseFloat(tempStr, 32) loc, ok := m[name] if !ok { loc = NewLocation() m[name] = loc } loc.Add(temp) } keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } sort.Strings(keys) for _, name := range keys { loc := m[name] mean := loc.sum / float64(loc.count) fmt.Printf("%s: %.1f/%.1f/%.1f\n", name, loc.min, mean, loc.max) } }
이 간단한 버전은 실행하는 데 약 97초가 걸렸습니다.
실행 프로필을 분석하면서 가장 큰 병목 현상 중 하나가 strconv.ParseFloat 함수라는 것을 깨달았습니다. 기본적으로 전체 실행 시간은 23초(전체 시간의 약 23%)였습니다.
이 함수의 문제점은 일반적이라는 것입니다. 즉, 유효한 부동 소수점과 함께 작동하도록 만들어졌습니다. 그러나 우리 데이터에는 매우 구체적인 온도 형식이 있습니다. 아래 예를 참조하세요.
Antananarivo;15.6 Iqaluit;-20.7 Dolisie;5.8 Kuopio;-6.8
온도 형식은 항상 동일합니다. 점 앞의 한 자리 또는 두 자리, 점 뒤의 한 자리이며 시작 부분에 빼기 기호가 포함될 수 있습니다. 따라서 ParseFloat의 일반적인 검사를 모두 수행할 필요 없이 프로세스를 최적화하여 특정 방식으로 값을 변환하는 함수를 만들 수 있습니다.
func bytesToTemp(b []byte) float64 { var v int16 var isNeg int16 = 1 for i := 0; i < len(b)-1; i++ { char := b[i] if char == '-' { isNeg = -1 } else if char == '.' { digit := int16(b[i+1] - '0') v = v*10 + digit } else { digit := int16(char - '0') v = v*10 + digit } } return float64(v*isNeg) / 10 }
문자열 대신 바이트 형식으로 데이터를 읽기 위해 스캐너의 반환을 문자열에서 바이트로 변경했습니다
line := scanner.Bytes() before, after, _ := bytes.Cut(line, []byte{';'}) name := string(before) temp := bytesToTemp(after)
이런 작은 변화로 인해 실행 시간이 75초로 단축되었습니다.
Scan을 사용하는 가장 큰 장점은 프로그램이 전체 파일을 한 번에 메모리에 로드할 필요가 없다는 것입니다. 대신 한 줄씩 읽어 성능을 메모리로 교환할 수 있습니다.
한 번에 한 줄씩 읽는 것과 한 번에 14GB의 데이터를 로드하는 것 사이에는 절충안이 있다는 점에 유의하는 것이 중요합니다. 이 중간 지점은 메모리 조각인 청크를 읽는 것입니다. 이렇게 하면 전체 파일을 한 번에 읽는 대신 128MB의 블록을 읽을 수 있습니다.
buf := make([]byte, chunkSize) reader := bufio.NewReader(file) var leftData []byte for { n, err := reader.Read(buf) if err != nil { if err == io.EOF { break } panic(err) } chunk := append(leftData, buf[:n]...) lastIndex := bytes.LastIndex(chunk, []byte{'\n'}) leftData = chunk[lastIndex+1:] lines := bytes.Split(chunk[:lastIndex], []byte{'\n'}) for _, line := range lines { before, after, _ := bytes.Cut(line, []byte{';'}) name := string(before) temp := bytesToTemp(after) loc, ok := m[name] if !ok { loc = NewLocation() m[name] = loc } loc.Add(temp) } }
그 결과 실행 시간이 70초로 단축되었습니다. 이전보다 나아졌지만 여전히 개선의 여지가 있습니다.
전체 챌린지가 소수점 이하 자릿수를 중심으로 진행된다는 사실입니다. 그러나 부동 소수점을 다루는 것은 항상 큰 과제입니다(IEEE-754 참조). 그렇다면 온도를 정수로 표현하면 어떨까요?
type Location struct { min int16 max int16 sum int32 count int32 }
앞서 정의한 대로 온도는 항상 최대 3자리 숫자로 표시됩니다. 따라서 쉼표를 제거하면 -999에서 999까지 값이 달라질 수 있으므로 int16이면 충분합니다. 합산 및 계산에는 int32이면 충분합니다. 이 유형은 -2147483648에서 2147483647 사이에서 달라질 수 있기 때문입니다.
Dado que agora esperamos um valor inteiro de 16 bits para a temperatura, precisamos modificar a função bytesToTemp. Para isso, mudamos o retorno para int16 e removemos a divisão no final. Assim, a função vai sempre vai retornar um número inteiro.
func bytesToTemp(b []byte) int16 { var v int16 var isNeg int16 = 1 for i := 0; i < len(b)-1; i++ { char := b[i] if char == '-' { isNeg = -1 } else if char == '.' { digit := int16(b[i+1] - '0') v = v*10 + digit } else { digit := int16(char - '0') v = v*10 + digit } } return v * isNeg }
Para finalizar, modifiquei a função Add para aceitar os valores inteiros e ajustei o print para dividir os valores antes de mostrá-los na tela. Com isso, o tempo caiu três segundos, indo para 60 segundos. Não é muito, mas uma vitória é uma vitória.
Novamente analisando o profile, vi que tinha uma certa função chamada slicebytetostring que custava 13,5 segundos de tempo de execução. Analisando, descobri que essa função é a responsável por converter um conjunto de bytes em uma string (o próprio nome da função deixa claro isso). No caso, essa é a função chamada internamente quando se usa a função string(bytes).
Em Go, assim como na maioria das linguagens, strings são imutáveis, o que significa que não podem ser modificadas após serem criadas (normalmente, quando se faz isso, uma nova string é criada). Por outro lado, listas são mutáveis. Ou seja, quando se faz uma conversão de uma lista de bytes para string, é preciso criar uma cópia da lista para garantir que a string não mude se a lista mudar.
Para evitar o custo adicional de alocação de memória nessas conversões, podemos utilizar a biblioteca unsafe para realizar a conversão de bytes para string (Nota: ela é chamada de unsafe por um motivo).
name := unsafe.String(unsafe.SliceData(before), len(before))
Diferente do caso anterior, a função acima reutiliza os bytes passados para gerar a string. O problema disso é que, se a lista original mudar, a string resultante também será afetada. Embora possamos garantir que isso não ocorrerá neste contexto específico, em aplicações maiores e mais complexas, o uso de unsafe pode se tornar bem inseguro.
Com essa mudança, reduzimos o tempo de execução para 51 segundos. Nada mal.
Lembra que eu mencionei que as temperaturas sempre tinham formatos específicos? Então, segundo o profile da execução, que separa a linha em duas partes (nome da cidade e temperatura), custa 5.38 segundos para rodar. E refizermos ela na mão?
Para separar os dois valores, precisamos encontrar onde está o ";". Como a gente já sabe, os valores da temperatura podem ter entre três e cinco caracteres. Assim, precisamos verificar se o caractere anterior aos dígitos é um ";". Simples, não?
idx := 0 if line[len(line)-4] == ';' { idx = len(line) - 4 } else if line[len(line)-5] == ';' { idx = len(line) - 5 } else { idx = len(line) - 6 } before := line[:idx] after := line[idx+1:]
Com isso, o tempo de execução foi para 46 segundos, cerca de 5 segundos a menos que antes (quem diria, não é?).
Todo esse tempo, nosso objetivo foi tornar o código o mais rápido possível em um núcleo. Mudando coisas aqui e ali, diminuímos o tempo de 97 segundos para 46 segundos. Claro, ainda daria para melhorar o tempo sem ter que lidar com paralelismo, mas a vida é curta demais para se preocupar com isso, não é?
Para poder rodar o código em vários núcleos, decidi usar a estrutura de canais nativa do Go. Além disso, também criei um grupo de espera que vai indicar quando o processamento dos dados terminaram.
Vale destacar que workers, nesse caso, é uma constante que define quantas goroutines serão criadas para processar os dados. No meu caso, são 12, visto que eu tenho um processador com 6 núcleos e 12 threads.
chunkChan := make(chan []byte, workers) var wg sync.WaitGroup wg.Add(workers)
O próximo passo foi criar as goroutines que serão responsável por receber os dados do canal e processá-los. A lógica de processamento dos dados é semelhante ao modelo single thread.
for i := 0; i < workers; i++ { go func() { for chunk := range chunkChan { lines := bytes.Split(chunk, []byte{'\n'}) for _, line := range lines { before, after := parseLine(line) name := unsafe.String(unsafe.SliceData(before), len(before)) temp := bytesToTemp(after) loc, ok := m[name] if !ok { loc = NewLocation() m[name] = loc } loc.Add(temp) } } wg.Done() }() }
Por fim, o código responsável por ler os dados do disco e enviá-los ao canal:
for { n, err := reader.Read(buf) if err != nil { if err == io.EOF { break } panic(err) } chunk := append(leftData, buf[:n]...) lastIndex := bytes.LastIndex(chunk, []byte{'\n'}) leftData = chunk[lastIndex+1:] chunkChan <- chunk[:lastIndex] } close(chunkChan) wg.Wait()
Vale ressaltar que os mapas em Go não são thread-safe. Isso significa que acessar ou alterar dados no mesmo mapa de forma concorrente pode levar a problemas de consistência ou erros. Embora não tenha observado problemas durante meus testes, vale a pena tratar esse problema.
Uma das maneiras de resolver esse problema seria criar um mecanismo de trava para o mapa, permitindo que somente uma goroutine consiga utilizá-lo por vez. Isso, claro, poderia tornar a execução um pouco mais lenta.
A segunda alternativa consiste em criar um mapa para cada uma das goroutines, de modo que não vai existir concorrência entre elas. Por fim, os mapas são colocados em um novo canal e os valores do mapa principal calculados a partir deles. Essa solução ainda vai ter um custo, mas vai ser menor que a anterior.
close(chunkChan) go func() { wg.Wait() close(mapChan) }() keys := make([]string, 0, 825) m := map[string]*Location{} for lm := range mapChan { for lk, lLoc := range lm { loc, ok := m[lk] if !ok { keys = append(keys, lk) m[lk] = lLoc continue } if lLoc.min < loc.min { loc.min = lLoc.min } if lLoc.max > loc.max { loc.max = lLoc.max } loc.sum += lLoc.sum loc.count += lLoc.count } }
Além disso, como o processamento passou a ser distribuído entre diferentes núcleos, diminui o tamanho do chunk de 128 MB para 2 MB. Cheguei nesse número testando vários valores, tendo entre 1 MB e 5 MB os melhores resultando. Na média, 2 MB obteve o melhor desempenho.
Enfim, o nosso tempo de processamento caiu de 46 segundos para... 12 segundos.
Todas as vezes que eu analisava o profile, a função bytes.Split chamava a minha atenção. O tempo de execução dela era de 16 segundos (tempo total, considerando todas as goroutines), o que parece justo, visto que ela é responsável por quebrar um chunk em linhas. No entanto, parecia um trabalho dobrado, dado que ela primeiro quebra as linhas para, em seguida, as linhas serem lidas uma a uma. Por que não fazer ambos ao mesmo tempo?
Minha abordagem foi percorrer o chunk e verificar se o byte atual correspondia a um \n. Dessa forma, consegui percorrer todas as linhas ao mesmo tempo em que as quebrava, processando em seguida.
start := 0 start := 0 for end, b := range chunk { if b != '\n' { continue } before, after := parseLine(chunk[start:end]) // ... start = end + 1 }
Com essa simples mudança, o tempo de execução caiu para aproximadamente 9 segundos.
Executed in 8.45 secs fish external usr time 58.47 secs 152.00 micros 58.47 secs sys time 4.52 secs 136.00 micros 4.52 secs
Atualmente, o maior gargalo da aplicação é o mapa. Somando todas as operações de leitura e escrita, são 32 segundos (de longe, o tempo mais alto). Talvez criar um algoritmo de hash mais eficiente resolva? Fica como ideia para o futuro.
No mais, conseguimos diminuir o tempo de 1 minuto e 40 segundos para quase 8 segundos, sem usar qualquer biblioteca externa. Além disso, tentando fazer a aplicação ficar cada vez mais rápida, me fez aprender muita coisa.
위 내용은 Go에서 10억줄 문제 해결하기 (from s)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!