Maison >développement back-end >Golang >Problème de fusion externe – Guide complet pour les Gophers
Le problème du tri externe est un sujet bien connu dans les cours d'informatique et est souvent utilisé comme outil pédagogique. Cependant, il est rare de rencontrer quelqu'un qui a réellement implémenté une solution à ce problème dans le code pour un scénario technique spécifique, sans parler des optimisations requises. Relever ce défi lors d'un hackathon m'a inspiré pour écrire cet article.
Voici donc la tâche du hackathon :
Vous disposez d'un simple fichier texte avec des adresses IPv4. Une ligne est une adresse, ligne par ligne :
145.67.23.4 8.34.5.23 89.54.3.124 89.54.3.124 3.45.71.5 ...
Le fichier est de taille illimitée et peut occuper des dizaines et des centaines de gigaoctets.
Vous devez calculer le nombre d'adresses uniques dans ce fichier en utilisant le moins de mémoire et de temps possible. Il existe un algorithme "naïf" pour résoudre ce problème (lire ligne par ligne, mettre les lignes dans HashSet). C'est mieux si votre implémentation est plus compliquée et plus rapide que cet algorithme naïf.
Un fichier de 120 Go avec 8 milliards de lignes a été soumis pour analyse.
Il n'y avait aucune exigence spécifique concernant la vitesse d'exécution du programme. Cependant, après avoir rapidement examiné les informations disponibles sur le sujet en ligne, j'ai conclu qu'un temps d'exécution acceptable pour du matériel standard (comme un PC domestique) serait d'environ une heure ou moins.
Pour des raisons évidentes, le fichier ne peut être lu et traité dans son intégralité que si le système dispose d'au moins 128 Go de mémoire disponible. Mais travailler avec des morceaux et fusionner est-il inévitable ?
Si vous n'êtes pas à l'aise avec la mise en œuvre d'une fusion externe, je vous suggère d'abord de vous familiariser avec une solution alternative acceptable, bien que loin d'être optimale.
Créez un bitmap 2^32 bits. Il s'agit d'un tableau uint64, puisque uint64 contient 64 bits.
Pour chaque IP :
Avantages :
Détection d'unicité très rapide : mise à 1 du bit O(1), pas besoin de vérifier, il suffit de le mettre.
Aucune surcharge pour le hachage, le tri, etc.
Inconvénients :
Consommation de mémoire énorme (512 Mo pour tout l'espace IPv4, sans tenir compte de la surcharge).
Si le fichier est volumineux, mais plus petit que l'espace IPv4 complet, cela peut quand même être avantageux en termes de temps, mais pas toujours raisonnable en termes de mémoire.
package main import ( "bufio" "fmt" "os" "strconv" "strings" "math/bits" ) // Parse IP address "A.B.C.D" => uint32 number func ipToUint32(ipStr string) (uint32, error) { parts := strings.Split(ipStr, ".") if len(parts) != 4 { return 0, fmt.Errorf("invalid IP format") } var ipNum uint32 for i := 0; i < 4; i++ { val, err := strconv.Atoi(parts[i]) if err != nil || val < 0 || val > 255 { return 0, fmt.Errorf("invalid IP octet: %v", parts[i]) } ipNum = (ipNum << 8) | uint32(val) } return ipNum, nil } func popcount64(x uint64) int { return bits.OnesCount64(x) } func main() { filePath := "ips.txt" file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) return } defer file.Close() // IPv4 space size: 2^32 = 4,294,967,296 // We need 2^32 bits, that is (2^32)/64 64-bit words totalBits := uint64(1) << 32 // 2^32 arraySize := totalBits / 64 //how many uint64 do we need bitset := make([]uint64, arraySize) scanner := bufio.NewScanner(file) for scanner.Scan() { ipStr := scanner.Text() ipNum, err := ipToUint32(ipStr) if err != nil { fmt.Printf("Incorrect IP: %s\n", ipStr) continue } idx := ipNum / 64 bit := ipNum % 64 mask := uint64(1) << bit // Setting the bit bitset[idx] |= mask } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) return } count := 0 for _, val := range bitset { count += bits.OnesCount64(val) } fmt.Printf("Number of unique IP addresses: %d\n", count) }
Cette approche est simple et fiable, ce qui en fait une option viable lorsqu'aucune alternative n'est disponible. Cependant, dans un environnement de production, en particulier lorsque l'on vise des performances optimales, il est essentiel de développer une solution plus efficace.
Ainsi, notre approche implique le découpage, le tri de fusion interne et la déduplication.
Le fichier est divisé en parties relativement petites (morceaux), disons quelques centaines de mégaoctets ou quelques gigaoctets. Pour chaque morceau :
Une goroutine (ou un pool de goroutines) est lancée, qui lit le morceau, analyse les adresses IP en nombres et les stocke dans un tableau temporaire en mémoire.
Ensuite, ce tableau est trié (par exemple, avec le tri standard.Slice), et le résultat, après suppression des doublons, est écrit dans un fichier temporaire.
Étant donné que chaque partie peut être traitée indépendamment, vous pouvez exécuter plusieurs de ces gestionnaires en parallèle, si vous disposez de plusieurs cœurs de processeur et d'une bande passante disque suffisante. Cela vous permettra d'utiliser les ressources le plus efficacement possible.
Une fois que tous les morceaux sont triés et écrits dans des fichiers temporaires, vous devez fusionner ces listes triées en un seul flux trié, en supprimant les doublons :
Semblable au processus de tri externe, vous pouvez paralléliser la fusion en divisant plusieurs fichiers temporaires en groupes, en les fusionnant en parallèle et en réduisant progressivement le nombre de fichiers.
Cela laisse un grand flux de sortie trié et dédupliqué, à partir duquel vous pouvez calculer le nombre total d'adresses IP uniques.
Avantages de la parallélisation :
Utilisation de plusieurs cœurs de processeur :
Le tri monothread d'un très grand tableau peut être lent, mais si vous disposez d'un processeur multicœur, vous pouvez trier plusieurs morceaux en parallèle, accélérant ainsi le processus plusieurs fois.
Équilibrage de charge :
Si la taille des morceaux est choisie judicieusement, chaque morceau peut être traité dans à peu près le même laps de temps. Si certains morceaux sont plus gros/plus petits ou plus complexes, vous pouvez répartir dynamiquement leur traitement sur différentes goroutines.
La parallélisation permet de lire un morceau pendant qu'un autre est trié ou écrit, réduisant ainsi le temps d'inactivité.
Le tri externe se prête naturellement à la parallélisation via le regroupement de fichiers. Cette approche permet une utilisation efficace des processeurs multicœurs et minimise les goulots d'étranglement des E/S, ce qui se traduit par un tri et une déduplication nettement plus rapides par rapport à une approche monothread. En répartissant efficacement la charge de travail, vous pouvez atteindre des performances élevées même lorsque vous traitez des ensembles de données volumineux.
Considération importante :
En lisant le fichier ligne par ligne, on peut également compter le nombre total de lignes. Au cours du processus, nous effectuons la déduplication en deux étapes : d'abord lors du chunking, puis lors de la fusion. Par conséquent, il n’est pas nécessaire de compter les lignes dans le fichier de sortie final. Au lieu de cela, le nombre total de lignes uniques peut être calculé comme :
finalCount := totalLines - (DeletedInChunks DeletedInMerge)
Cette approche évite les opérations redondantes et rend le calcul plus efficace en gardant une trace des suppressions à chaque étape de la déduplication. Cela nous fait gagner des minutes de service.
Encore une chose :
Étant donné que tout petit gain de performances est important sur d'énormes quantités de données, je suggère d'utiliser un analogue accéléré auto-écrit de strings.Slice()
145.67.23.4 8.34.5.23 89.54.3.124 89.54.3.124 3.45.71.5 ...
De plus, un modèle de travailleur a été adopté pour gérer le traitement parallèle, le nombre de threads étant configurable. Par défaut, le nombre de threads est défini sur runtime.NumCPU(), permettant au programme d'utiliser efficacement tous les cœurs de processeur disponibles. Cette approche garantit une utilisation optimale des ressources tout en offrant également la flexibilité d'ajuster le nombre de threads en fonction des exigences spécifiques ou des limitations de l'environnement.
Remarque importante : Lors de l'utilisation du multithreading, il est crucial de protéger les données partagées pour éviter les conditions de concurrence et garantir l'exactitude du programme. Ceci peut être réalisé en utilisant des mécanismes de synchronisation tels que des mutex, des canaux (en Go) ou d'autres techniques sécurisées pour la concurrence, en fonction des exigences spécifiques de votre implémentation.
La mise en œuvre de ces idées a abouti à un code qui, lorsqu'il est exécuté sur un processeur Ryzen 7700 associé à un SSD M.2, a accompli la tâche en environ 40 minutes.
La considération suivante, basée sur le volume de données et donc la présence d'opérations de disque importantes, était l'utilisation de la compression. L'algorithme de Brotli a été choisi pour la compression. Son taux de compression élevé et sa décompression efficace en font un choix approprié pour réduire la surcharge des E/S du disque tout en conservant de bonnes performances pendant le stockage et le traitement intermédiaires.
Voici l'exemple de chunking avec Brotli :
package main import ( "bufio" "fmt" "os" "strconv" "strings" "math/bits" ) // Parse IP address "A.B.C.D" => uint32 number func ipToUint32(ipStr string) (uint32, error) { parts := strings.Split(ipStr, ".") if len(parts) != 4 { return 0, fmt.Errorf("invalid IP format") } var ipNum uint32 for i := 0; i < 4; i++ { val, err := strconv.Atoi(parts[i]) if err != nil || val < 0 || val > 255 { return 0, fmt.Errorf("invalid IP octet: %v", parts[i]) } ipNum = (ipNum << 8) | uint32(val) } return ipNum, nil } func popcount64(x uint64) int { return bits.OnesCount64(x) } func main() { filePath := "ips.txt" file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) return } defer file.Close() // IPv4 space size: 2^32 = 4,294,967,296 // We need 2^32 bits, that is (2^32)/64 64-bit words totalBits := uint64(1) << 32 // 2^32 arraySize := totalBits / 64 //how many uint64 do we need bitset := make([]uint64, arraySize) scanner := bufio.NewScanner(file) for scanner.Scan() { ipStr := scanner.Text() ipNum, err := ipToUint32(ipStr) if err != nil { fmt.Printf("Incorrect IP: %s\n", ipStr) continue } idx := ipNum / 64 bit := ipNum % 64 mask := uint64(1) << bit // Setting the bit bitset[idx] |= mask } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) return } count := 0 for _, val := range bitset { count += bits.OnesCount64(val) } fmt.Printf("Number of unique IP addresses: %d\n", count) }
L'efficacité de la compression est discutable et fortement dépendante des conditions dans lesquelles la solution est utilisée. Une compression élevée réduit l'utilisation de l'espace disque mais augmente proportionnellement le temps d'exécution global. Sur les disques durs lents, la compression peut augmenter considérablement la vitesse, car les E/S disque deviennent le goulot d'étranglement. À l’inverse, sur les SSD rapides, la compression peut entraîner des temps d’exécution plus lents.
Lors de tests effectués sur un système équipé de SSD M.2, la compression n'a montré aucune amélioration des performances. En conséquence, j’ai finalement décidé d’y renoncer. Cependant, si vous êtes prêt à risquer d'ajouter de la complexité à votre code et potentiellement de réduire sa lisibilité, vous pouvez implémenter la compression en tant que fonctionnalité facultative, contrôlée par un indicateur configurable.
Dans la poursuite d'une optimisation plus poussée, nous tournons notre attention vers la transformation binaire de notre solution. Une fois les adresses IP textuelles converties en hachages numériques, toutes les opérations ultérieures peuvent être effectuées au format binaire.
145.67.23.4 8.34.5.23 89.54.3.124 89.54.3.124 3.45.71.5 ...
Avantages du format binaire
Chaque nombre occupe une taille fixe (par exemple, uint32 = 4 octets).
Pour 1 million d'adresses IP, la taille du fichier ne sera que d'environ 4 Mo.
Il n'est pas nécessaire d'analyser les chaînes, ce qui accélère les opérations de lecture et d'écriture.
En utilisant un ordre d'octets cohérent (LittleEndian ou BigEndian), les fichiers peuvent être lus sur différentes plates-formes.
Conclusion
Le stockage des données au format binaire est une méthode plus efficace pour écrire et lire des nombres. Pour une optimisation complète, convertissez les processus d'écriture et de lecture des données au format binaire. Utilisez binaire.Write pour l'écriture et binaire.Read pour la lecture.
Voici à quoi pourrait ressembler la fonction processChunk pour fonctionner avec le format binaire :
package main import ( "bufio" "fmt" "os" "strconv" "strings" "math/bits" ) // Parse IP address "A.B.C.D" => uint32 number func ipToUint32(ipStr string) (uint32, error) { parts := strings.Split(ipStr, ".") if len(parts) != 4 { return 0, fmt.Errorf("invalid IP format") } var ipNum uint32 for i := 0; i < 4; i++ { val, err := strconv.Atoi(parts[i]) if err != nil || val < 0 || val > 255 { return 0, fmt.Errorf("invalid IP octet: %v", parts[i]) } ipNum = (ipNum << 8) | uint32(val) } return ipNum, nil } func popcount64(x uint64) int { return bits.OnesCount64(x) } func main() { filePath := "ips.txt" file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) return } defer file.Close() // IPv4 space size: 2^32 = 4,294,967,296 // We need 2^32 bits, that is (2^32)/64 64-bit words totalBits := uint64(1) << 32 // 2^32 arraySize := totalBits / 64 //how many uint64 do we need bitset := make([]uint64, arraySize) scanner := bufio.NewScanner(file) for scanner.Scan() { ipStr := scanner.Text() ipNum, err := ipToUint32(ipStr) if err != nil { fmt.Printf("Incorrect IP: %s\n", ipStr) continue } idx := ipNum / 64 bit := ipNum % 64 mask := uint64(1) << bit // Setting the bit bitset[idx] |= mask } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) return } count := 0 for _, val := range bitset { count += bits.OnesCount64(val) } fmt.Printf("Number of unique IP addresses: %d\n", count) }WTF ?! C'est devenu beaucoup plus lent !!
Au format binaire, le fonctionnement est devenu plus lent. Un fichier de 100 millions de lignes (adresses IP) est traité sous forme binaire en 4,5 minutes, contre 25 secondes en texte. Avec une taille de morceau et un nombre de travailleurs égaux. Pourquoi ?
Travailler avec le format binaire peut être plus lent qu'avec le format texte
L'utilisation du format binaire peut parfois être plus lente que le format texte en raison des spécificités du fonctionnement de Binary.Read et de Binary.Write, ainsi que des inefficacités potentielles dans leur mise en œuvre. Voici les principales raisons pour lesquelles cela pourrait arriver :Opérations d'E/S
Fonctionne avec des blocs de données plus volumineux à l'aide de bufio.Scanner, optimisé pour la lecture de lignes.
Lit des lignes entières et les analyse, ce qui peut être plus efficace pour les petites opérations de conversion.
binary.Read lit 4 octets à la fois, ce qui entraîne de petites opérations d'E/S plus fréquentes.
Les appels fréquents à binaire.Read augmentent la surcharge due au basculement entre l'espace utilisateur et l'espace système.
Solution : Utilisez un tampon pour lire plusieurs nombres à la fois.
func fastSplit(s string) []string { n := 1 c := DelimiterByte for i := 0; i < len(s); i++ { if s[i] == c { n++ } } out := make([]string, n) count := 0 begin := 0 length := len(s) - 1 for i := 0; i <= length; i++ { if s[i] == c { out[count] = s[begin:i] count++ begin = i + 1 } } out[count] = s[begin : length+1] return out }
Pourquoi la mise en mémoire tampon améliore-t-elle les performances ?
Moins d'opérations d'E/S :
Au lieu d'écrire chaque numéro directement sur le disque, les données sont accumulées dans un tampon et écrites en blocs plus grands.
Frais généraux réduits :
Chaque opération d'écriture sur disque entraîne une surcharge en raison du changement de contexte entre le processus et le système d'exploitation. La mise en mémoire tampon réduit le nombre de ces appels.
Nous présentons également le code de fusion binaire multiphase :
145.67.23.4 8.34.5.23 89.54.3.124 89.54.3.124 3.45.71.5 ...
Le résultat est fantastique : 14 min pour un fichier de 110 Go avec 8 milliards de lignes !
C'est un résultat exceptionnel ! Traiter un fichier de 110 Go comportant 8 milliards de lignes en 14 minutes est en effet impressionnant. Cela démontre le pouvoir de :
En traitant de gros morceaux de données en mémoire au lieu de ligne par ligne ou valeur par valeur, vous réduisez considérablement le nombre d'opérations d'E/S, qui constituent souvent le goulot d'étranglement.
Le passage à la lecture et à l'écriture binaires minimise la surcharge d'analyse, réduit la taille des données intermédiaires et améliore l'efficacité de la mémoire.
L'utilisation d'algorithmes économes en mémoire pour la déduplication et le tri garantit que les cycles du processeur sont utilisés efficacement.
Tirer parti des goroutines et des canaux pour paralléliser la charge de travail entre les travailleurs équilibre l'utilisation du processeur et du disque.
Enfin, voici le code complet de la solution finale. N'hésitez pas à l'utiliser et à l'adapter à vos besoins !
Solution de fusion externe pour Gophers
Bonne chance !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!