>백엔드 개발 >파이썬 튜토리얼 >대용량 파일 정렬/외장 메모리 정렬 문제를 이해하기 위한 기사 1개

대용량 파일 정렬/외장 메모리 정렬 문제를 이해하기 위한 기사 1개

藏色散人
藏色散人앞으로
2021-07-14 14:01:233773검색

질문 1: 파일에는 5억 줄이 포함되어 있으며 각 줄은 임의의 정수이며 파일의 모든 정수를 정렬해야 합니다.

Divide and Conquer(pide&Conquer), Reference빅데이터 알고리즘: 5억 데이터 정렬

이 total.txt를 500,000,000줄로 정렬합니다. 파일 크기는 4.6G입니다.

10,000줄을 읽을 때마다 정렬하고 새 하위 파일에 씁니다(여기서는 quick sort를 사용합니다).

1. 분할 및 정렬

#!/usr/bin/python2.7

import time

def readline_by_yield(bfile):
    with open(bfile, 'r') as rf:
        for line in rf:
            yield line

def quick_sort(lst):
    if len(lst) < 2:
        return lst
    pivot = lst[0]
    left = [ ele for ele in lst[1:] if ele < pivot ]
    right = [ ele for ele in lst[1:] if ele >= pivot ]
    return quick_sort(left) + [pivot,] + quick_sort(right)

def split_bfile(bfile):
    count = 0
    nums = []
    for line in readline_by_yield(bfile):
        num = int(line)
        if num not in nums:
            nums.append(num)
        if 10000 == len(nums):
            nums = quick_sort(nums)
            with open(&#39;subfile/subfile{}.txt&#39;.format(count+1),&#39;w&#39;) as wf:
                wf.write(&#39;\n&#39;.join([ str(i) for i in nums ]))
            nums[:] = []
            count += 1
            print count

now = time.time()
split_bfile(&#39;total.txt&#39;)
run_t = time.time()-now
print &#39;Runtime : {}&#39;.format(run_t)

은 50,000개의 작은 파일을 생성하며 각 작은 파일의 크기는 약 96K입니다.

프로그램 실행 중 메모리 사용량은 5424kB around

전체 파일 분할을 완료하는 데 94146 초가 걸렸습니다.

2. merge

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-

import os
import time

testdir = &#39;/ssd/subfile&#39;

now = time.time() 

# Step 1 : 获取全部文件描述符
fds = []
for f in os.listdir(testdir):
    ff = os.path.join(testdir,f)
    fds.append(open(ff,&#39;r&#39;))

# Step 2 : 每个文件获取第一行,即当前文件最小值
nums = []
tmp_nums = []
for fd in fds:
    num = int(fd.readline())
    tmp_nums.append(num)

# Step 3 : 获取当前最小值放入暂存区,并读取对应文件的下一行;循环遍历。
count = 0
while 1:
    val = min(tmp_nums)
    nums.append(val)
    idx = tmp_nums.index(val)
    next = fds[idx].readline()
    # 文件读完了
    if not next:
        del fds[idx]
        del tmp_nums[idx]
    else:
        tmp_nums[idx] = int(next)
    # 暂存区保存1000个数,一次性写入硬盘,然后清空继续读。
    if 1000 == len(nums):
        with open(&#39;final_sorted.txt&#39;,&#39;a&#39;) as wf:
            wf.write(&#39;\n&#39;.join([ str(i) for i in nums ]) + &#39;\n&#39;)
        nums[:] = []
    if 499999999 == count:
        break
    count += 1
   
with open(&#39;runtime.txt&#39;,&#39;w&#39;) as wf:
    wf.write(&#39;Runtime : {}&#39;.format(time.time()-now))

프로그램 실행 중 메모리 사용량은 약 240M

약 38시간 정도 걸렸습니다. ..

메모리 사용량은 줄어들지만 시간 복잡도가 너무 높습니다. 파일 수를 줄이면 메모리 사용량을 더욱 줄일 수 있습니다(각 작은 파일에 저장되는 행 수가 늘어납니다).

질문 2: 파일에는 1000억 줄의 데이터가 있으며 각 줄은 IP 주소이며 IP 주소를 정렬해야 합니다.

IP 주소를 숫자로 변환

# 方法一:手动计算
 
In [62]: ip
Out[62]: &#39;10.3.81.150&#39;
 
In [63]: ip.split(&#39;.&#39;)[::-1]
Out[63]: [&#39;150&#39;, &#39;81&#39;, &#39;3&#39;, &#39;10&#39;]
 
In [64]: [ &#39;{}-{}&#39;.format(idx,num) for idx,num in enumerate(ip.split(&#39;.&#39;)[::-1]) ]
Out[64]: [&#39;0-150&#39;, &#39;1-81&#39;, &#39;2-3&#39;, &#39;3-10&#39;]
 
In [65]: [256**idx*int(num) for idx,num in enumerate(ip.split(&#39;.&#39;)[::-1])]
Out[65]: [150, 20736, 196608, 167772160]
 
In [66]: sum([256**idx*int(num) for idx,num in enumerate(ip.split(&#39;.&#39;)[::-1])])                     
Out[66]: 167989654 
In [67]:
 
# 方法二:使用C扩展库来计算
In [71]: import socket,struct
In [72]: socket.inet_aton(ip)
Out[72]: b&#39;\n\x03Q\x96&#39;
 
In [73]: struct.unpack("!I", socket.inet_aton(ip))
# !表示使用网络字节顺序解析, 后面的I表示unsigned int, 对应Python里的integer or long 
Out[73]: (167989654,)
 
In [74]: struct.unpack("!I", socket.inet_aton(ip))[0]
Out[74]: 167989654
 
In [75]: socket.inet_ntoa(struct.pack("!I", 167989654))              
Out[75]: &#39;10.3.81.150&#39;
 
In [76]:

질문 3: 1.3GB 파일이 있습니다(총 1억 줄). 파일에서 가장 많이 반복되는 문자열을 찾아보세요.

기본 아이디어: 대용량 파일을 반복적으로 읽고, 대용량 파일을 여러 개의 작은 파일로 분할하고, 마지막으로 이러한 작은 파일을 병합합니다.

분할 규칙:

대용량 파일을 반복적으로 읽고 메모리에 사전을 유지합니다. 키는 문자열이고 값은 문자열의 발생 횟수입니다.

사전에 유지되는 문자열 유형의 수입니다. 10,000에 도달하면(정의할 때 맞춤설정 가능) 사전을 키별로 작은 것부터 큰 것까지 정렬한 다음 작은 파일에 씁니다. 각 줄은 키 값입니다.

그런 다음 사전을 지우고 큰 파일이 나올 때까지 계속 읽으세요. 끝났습니다.

병합 규칙:

먼저 모든 작은 파일의 파일 설명자를 가져온 다음 비교를 위해 첫 번째 줄(즉, 각 작은 파일 문자열 중 가장 작은 ASCII 값을 가진 문자열)을 읽어보세요.

아스키 값이 가장 작은 문자열을 찾아 중복된 경우 발생 횟수를 더한 다음 현재 문자열과 총 횟수를 메모리 목록에 저장합니다.

그런 다음 가장 작은 문자열이 있는 파일의 읽기 포인터를 아래쪽으로 이동합니다. 즉, 다음 비교 라운드를 위해 해당 작은 파일에서 다른 줄을 읽습니다.

메모리에 있는 목록의 개수가 10,000개에 도달하면 목록의 내용이 최종 파일에 기록되어 하드디스크에 한번에 저장됩니다. 동시에 후속 비교를 위해 목록을 지웁니다.

작은 파일을 모두 읽을 때까지 최종 파일은 문자열 ASCII 값에 따라 오름차순으로 정렬된 큰 파일입니다. 각 줄의 내용은 문자열 t의 반복 횟수,

최종 반복입니다. 이 최종 파일의 경우 가장 많이 반복된 파일을 찾으세요.

1. 분할

def readline_by_yield(bfile):
    with open(bfile, &#39;r&#39;) as rf:
        for line in rf:
            yield line

def split_bfile(bfile):
    count = 0
    d = {}
    for line in readline_by_yield(bfile):
        line = line.strip()
        if line not in d:
            d[line] = 0
        d[line] += 1
        if 10000 == len(d):
            text = &#39;&#39;
            for string in sorted(d):
                text += &#39;{}\t{}\n&#39;.format(string,d[string])
            with open(&#39;subfile/subfile{}.txt&#39;.format(count+1),&#39;w&#39;) as wf:
                wf.write(text.strip())
            d.clear()
            count += 1

    text = &#39;&#39;
    for string in sorted(d):
        text += &#39;{}\t{}\n&#39;.format(string,d[string])
    with open(&#39;subfile/subfile_end.txt&#39;,&#39;w&#39;) as wf:
        wf.write(text.strip())

split_bfile(&#39;bigfile.txt&#39;)

2. 병합

import os
import json
import time
import traceback

testdir = &#39;/ssd/subfile&#39;

now = time.time() 

# Step 1 : 获取全部文件描述符
fds = []
for f in os.listdir(testdir):
    ff = os.path.join(testdir,f)
    fds.append(open(ff,&#39;r&#39;))

# Step 2 : 每个文件获取第一行
tmp_strings = []
tmp_count = []
for fd in fds:
    line = fd.readline()
    string,count = line.strip().split(&#39;\t&#39;)
    tmp_strings.append(string)
    tmp_count.append(int(count))

# Step 3 : 获取当前最小值放入暂存区,并读取对应文件的下一行;循环遍历。
result = []
need2del = []

while True:
    min_str = min(tmp_strings)
    str_idx = [i for i,v in enumerate(tmp_strings) if v==min_str]
    str_count = sum([ int(tmp_count[idx]) for idx in str_idx ])
    result.append(&#39;{}\t{}\n&#39;.format(min_str,str_count))
    for idx in str_idx:
        next = fds[idx].readline()  # IndexError: list index out of range
        # 文件读完了
        if not next:
            need2del.append(idx)
        else:
            next_string,next_count = next.strip().split(&#39;\t&#39;)
            tmp_strings[idx] = next_string
            tmp_count[idx] = next_count
    # 暂存区保存10000个记录,一次性写入硬盘,然后清空继续读。
    if 10000 == len(result):
        with open(&#39;merged.txt&#39;,&#39;a&#39;) as wf:
            wf.write(&#39;&#39;.join(result))
        result[:] = []
    # 注意: 文件读完需要删除文件描述符的时候, 需要逆序删除
    need2del.reverse()
    for idx in need2del:
        del fds[idx]
        del tmp_strings[idx]
        del tmp_count[idx]
    need2del[:] = []
    if 0 == len(fds):
        break

with open(&#39;merged.txt&#39;,&#39;a&#39;) as wf:
    wf.write(&#39;&#39;.join(result))
result[:] = []

병합 결과 분석:

분할 시 메모리에 유지되는 사전 크기 분할할 작은 파일 수 병합 시 유지할 파일 설명 기호 수 병합 중 메모리 사용량 병합 시간
처음 10000 9000 9000 ~ 0 200M 병합 속도가 느리고 완료 시간이 계산되지 않았습니다. 아직
두 번째 100000 900 900 ~ 0 27m 은 2572초로 빠릅니다.

3. 초 수

import time

def read_line(filepath):
    with open(filepath,&#39;r&#39;) as rf:
        for line in rf:
            yield line

start_ts = time.time()

max_str = None
max_count = 0
for line in read_line(&#39;merged.txt&#39;):
    string,count = line.strip().split(&#39;\t&#39;)
    if int(count) > max_count:
        max_count = int(count)
        max_str = string

print(max_str,max_count)
print(&#39;Runtime {}&#39;.format(time.time()-start_ts))

그리고 반환됩니다. 최종 파일은 총 9999788라인이고 크기는 256M입니다. 검색을 수행하는 데 27초가 걸리고 6480KB의 메모리를 차지합니다.

위 내용은 대용량 파일 정렬/외장 메모리 정렬 문제를 이해하기 위한 기사 1개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 cnblogs.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제