Home >Backend Development >Python Tutorial >One article to understand the problem of large file sorting/external memory sorting

One article to understand the problem of large file sorting/external memory sorting

藏色散人
藏色散人forward
2021-07-14 14:01:233774browse

Question 1: A file contains 500 million lines, each line is a random integer, and all integers in the file need to be sorted.

Divide&Conquer (pide&Conquer), ReferenceBig data algorithm: Sorting 500 million data

For this one Total.txt with 500000000 lines is sorted, and the file size is 4.6G.

Every time 10,000 lines are read, sort and write them into a new subfile (quick sort is used here).

1. Split & Sort

#!/usr/bin/python2.7

import time

def readline_by_yield(bfile):
    with open(bfile, 'r') as rf:
        for line in rf:
            yield line

def quick_sort(lst):
    if len(lst) < 2:
        return lst
    pivot = lst[0]
    left = [ ele for ele in lst[1:] if ele < pivot ]
    right = [ ele for ele in lst[1:] if ele >= pivot ]
    return quick_sort(left) + [pivot,] + quick_sort(right)

def split_bfile(bfile):
    count = 0
    nums = []
    for line in readline_by_yield(bfile):
        num = int(line)
        if num not in nums:
            nums.append(num)
        if 10000 == len(nums):
            nums = quick_sort(nums)
            with open(&#39;subfile/subfile{}.txt&#39;.format(count+1),&#39;w&#39;) as wf:
                wf.write(&#39;\n&#39;.join([ str(i) for i in nums ]))
            nums[:] = []
            count += 1
            print count

now = time.time()
split_bfile(&#39;total.txt&#39;)
run_t = time.time()-now
print &#39;Runtime : {}&#39;.format(run_t)

will generate 50,000 small files, each small file is about 96K in size.

During the execution of the program, the memory usage has been around 5424kB

It takes 94146 seconds to split the entire file.

2. Merge

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-

import os
import time

testdir = &#39;/ssd/subfile&#39;

now = time.time() 

# Step 1 : 获取全部文件描述符
fds = []
for f in os.listdir(testdir):
    ff = os.path.join(testdir,f)
    fds.append(open(ff,&#39;r&#39;))

# Step 2 : 每个文件获取第一行,即当前文件最小值
nums = []
tmp_nums = []
for fd in fds:
    num = int(fd.readline())
    tmp_nums.append(num)

# Step 3 : 获取当前最小值放入暂存区,并读取对应文件的下一行;循环遍历。
count = 0
while 1:
    val = min(tmp_nums)
    nums.append(val)
    idx = tmp_nums.index(val)
    next = fds[idx].readline()
    # 文件读完了
    if not next:
        del fds[idx]
        del tmp_nums[idx]
    else:
        tmp_nums[idx] = int(next)
    # 暂存区保存1000个数,一次性写入硬盘,然后清空继续读。
    if 1000 == len(nums):
        with open(&#39;final_sorted.txt&#39;,&#39;a&#39;) as wf:
            wf.write(&#39;\n&#39;.join([ str(i) for i in nums ]) + &#39;\n&#39;)
        nums[:] = []
    if 499999999 == count:
        break
    count += 1
   
with open(&#39;runtime.txt&#39;,&#39;w&#39;) as wf:
    wf.write(&#39;Runtime : {}&#39;.format(time.time()-now))

During the execution of the program, the memory usage has been around 240M

It took about 38 hours to merge less than 50 million rows of data...

Although the memory usage is reduced, the time is complicated The degree is too high; You can further reduce memory usage by reducing the number of files (increasing the number of rows stored in each small file).

Question 2: A file has 100 billion lines of data, each line is an IP address, and the IP addresses need to be sorted.

Convert IP address to numbers

# 方法一:手动计算
 
In [62]: ip
Out[62]: &#39;10.3.81.150&#39;
 
In [63]: ip.split(&#39;.&#39;)[::-1]
Out[63]: [&#39;150&#39;, &#39;81&#39;, &#39;3&#39;, &#39;10&#39;]
 
In [64]: [ &#39;{}-{}&#39;.format(idx,num) for idx,num in enumerate(ip.split(&#39;.&#39;)[::-1]) ]
Out[64]: [&#39;0-150&#39;, &#39;1-81&#39;, &#39;2-3&#39;, &#39;3-10&#39;]
 
In [65]: [256**idx*int(num) for idx,num in enumerate(ip.split(&#39;.&#39;)[::-1])]
Out[65]: [150, 20736, 196608, 167772160]
 
In [66]: sum([256**idx*int(num) for idx,num in enumerate(ip.split(&#39;.&#39;)[::-1])])                     
Out[66]: 167989654 
In [67]:
 
# 方法二:使用C扩展库来计算
In [71]: import socket,struct
In [72]: socket.inet_aton(ip)
Out[72]: b&#39;\n\x03Q\x96&#39;
 
In [73]: struct.unpack("!I", socket.inet_aton(ip))
# !表示使用网络字节顺序解析, 后面的I表示unsigned int, 对应Python里的integer or long 
Out[73]: (167989654,)
 
In [74]: struct.unpack("!I", socket.inet_aton(ip))[0]
Out[74]: 167989654
 
In [75]: socket.inet_ntoa(struct.pack("!I", 167989654))              
Out[75]: &#39;10.3.81.150&#39;
 
In [76]:

Question 3: There is a 1.3GB file (a total of 100 million lines). Each line in it is a string. Please find duplicates in the file. The most frequent string.

Basic idea: Read large files iteratively, split the large files into multiple small files; and finally merge these small files.

Splitting rules:

Read large files iteratively, maintain a dictionary in memory, the key is a string, and the value is the number of times the string appears;

When the number of string types maintained by the dictionary reaches 10,000 (customizable), the dictionary is sorted from small to large by key , and then written into a small file, each line is key\tvalue;

Then clear the dictionary and continue reading until the large file is finished.

Merge rules:

First get the file descriptors of all small files, and then read out the first line (that is, each small file The string with the smallest ascii value of the file string) is compared.

Find the string with the smallest ascii value. If there are duplicates, add up the number of occurrences, and then store the current string and the total number of times into a list in memory.

Then move the read pointer of the file where the smallest string is located downward, that is, read another line from the corresponding small file for the next round of comparison.

When the number of lists in the memory reaches 10,000, the contents of the list are written to a final file at once and stored on the hard disk. At the same time, clear the list for subsequent comparisons.

Until all the small files have been read, the final file is a large file sorted in ascending order according to the string ascii value. The content of each line is the string\trepetition times,

The last iteration is to read the final file and find the one with the most repetitions.

1. Split

def readline_by_yield(bfile):
    with open(bfile, &#39;r&#39;) as rf:
        for line in rf:
            yield line

def split_bfile(bfile):
    count = 0
    d = {}
    for line in readline_by_yield(bfile):
        line = line.strip()
        if line not in d:
            d[line] = 0
        d[line] += 1
        if 10000 == len(d):
            text = &#39;&#39;
            for string in sorted(d):
                text += &#39;{}\t{}\n&#39;.format(string,d[string])
            with open(&#39;subfile/subfile{}.txt&#39;.format(count+1),&#39;w&#39;) as wf:
                wf.write(text.strip())
            d.clear()
            count += 1

    text = &#39;&#39;
    for string in sorted(d):
        text += &#39;{}\t{}\n&#39;.format(string,d[string])
    with open(&#39;subfile/subfile_end.txt&#39;,&#39;w&#39;) as wf:
        wf.write(text.strip())

split_bfile(&#39;bigfile.txt&#39;)

2. Merge

import os
import json
import time
import traceback

testdir = &#39;/ssd/subfile&#39;

now = time.time() 

# Step 1 : 获取全部文件描述符
fds = []
for f in os.listdir(testdir):
    ff = os.path.join(testdir,f)
    fds.append(open(ff,&#39;r&#39;))

# Step 2 : 每个文件获取第一行
tmp_strings = []
tmp_count = []
for fd in fds:
    line = fd.readline()
    string,count = line.strip().split(&#39;\t&#39;)
    tmp_strings.append(string)
    tmp_count.append(int(count))

# Step 3 : 获取当前最小值放入暂存区,并读取对应文件的下一行;循环遍历。
result = []
need2del = []

while True:
    min_str = min(tmp_strings)
    str_idx = [i for i,v in enumerate(tmp_strings) if v==min_str]
    str_count = sum([ int(tmp_count[idx]) for idx in str_idx ])
    result.append(&#39;{}\t{}\n&#39;.format(min_str,str_count))
    for idx in str_idx:
        next = fds[idx].readline()  # IndexError: list index out of range
        # 文件读完了
        if not next:
            need2del.append(idx)
        else:
            next_string,next_count = next.strip().split(&#39;\t&#39;)
            tmp_strings[idx] = next_string
            tmp_count[idx] = next_count
    # 暂存区保存10000个记录,一次性写入硬盘,然后清空继续读。
    if 10000 == len(result):
        with open(&#39;merged.txt&#39;,&#39;a&#39;) as wf:
            wf.write(&#39;&#39;.join(result))
        result[:] = []
    # 注意: 文件读完需要删除文件描述符的时候, 需要逆序删除
    need2del.reverse()
    for idx in need2del:
        del fds[idx]
        del tmp_strings[idx]
        del tmp_count[idx]
    need2del[:] = []
    if 0 == len(fds):
        break

with open(&#39;merged.txt&#39;,&#39;a&#39;) as wf:
    wf.write(&#39;&#39;.join(result))
result[:] = []

Merge result analysis:

Dictionary size maintained in memory during splitting Number of small files to be split Number of file descriptors to be maintained during merging Memory usage during merging Merge takes time
First time 10000 9000 9000 ~ 0 200M The merge speed is slow and the completion time has not yet been calculated
Second time 100000 900 900 ~ 0 27M The merge speed is fast, only 2572 seconds

3. Find the most frequent occurrences Strings and their times

import time

def read_line(filepath):
    with open(filepath,&#39;r&#39;) as rf:
        for line in rf:
            yield line

start_ts = time.time()

max_str = None
max_count = 0
for line in read_line(&#39;merged.txt&#39;):
    string,count = line.strip().split(&#39;\t&#39;)
    if int(count) > max_count:
        max_count = int(count)
        max_str = string

print(max_str,max_count)
print(&#39;Runtime {}&#39;.format(time.time()-start_ts))

The merged file has a total of 9999788 lines and a size of 256M; it takes 27 seconds to perform the search and occupies 6480KB of memory.

The above is the detailed content of One article to understand the problem of large file sorting/external memory sorting. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:cnblogs.com. If there is any infringement, please contact admin@php.cn delete