Home >Backend Development >Python Tutorial >使用Python进行稳定可靠的文件操作详解

使用Python进行稳定可靠的文件操作详解

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOriginal
2016-06-06 11:28:501241browse

考虑下述Python代码片段。对文件中的数据进行某些操作,然后将结果保存回文件中:

代码如下:


with open(filename) as f:
   input = f.read()
output = do_something(input)
with open(filename, 'w') as f:
   f.write(output)

看起来很简单吧?可能看起来并不像乍一看这么简单。我在产品服务器中调试应用,经常会出现奇怪的行为。
这是我看过的失效模式的例子:
失控的服务器进程溢出大量日志,磁盘被填满。write()在截断文件之后抛出异常,文件将会变成空的。
应用的几个实例并行执行。在各个实例结束之后,因为混合了多个实例的输出,文件内容最终变成了天书。
在完成了写操作之后,应用会触发一些后续操作。几秒钟后断电。在我们重启了服务器之后,我们再一次看到了旧的文件内容。已经传递给其它应用的数据与我们在文件中看到的不再一致。
下面没有什么新的内容。本文的目的是为在系统编程方面缺少经验的Python开发者提供常见的方法和技术。我将会提供代码例子,使得开发者可以很容易的将这些方法应用到自己的代码中。
“可靠性”意味着什么?
广义的讲,可靠性意味着在所有规定的条件下操作都能执行它所需的函数。至于文件的操作,这个函数就是创建,替换或者追加文件的内容的问题。这里可以从数据库理论上获得灵感。经典的事务模型的ACID性质作为指导来提高可靠性。
开始之前,让我们先看看我们的例子怎样和ACID4个性质扯上关系:
原子性(Atomicity)要求这个事务要么完全成功,要么完全失败。在上面的实例中,磁盘满了可能导致部分内容写入文件。另外,如果正当在写入内容时其它程序又在读取文件,它们可能获得是部分完成的版本,甚至会导致写错误
一致性(Consistency) 表示操作必须从系统的一个状态到另一个状态。一致性可以分为两部分:内部和外部一致性。内部一致性是指文件的数据结构是一致的。外部一致性是指文件的内容与它相关的数据是相符合的。在这个例子中,因为我们不了解这个应用,所以很难推断是否符合一致性。但是因为一致性需要原子性,我们至少可以说没有保证内部一致性。
隔离性(Isolation)如果在并发的执行事务中,多个相同的事务导致了不同的结果,就违反了隔离性。很明显上面的代码对操作失败或者其它隔离性失败都没有保护。
持久性(Durability)意味着改变是持久不变的。在我们告诉用户成功之前,我们必须确保我们的数据存储是可靠的并且不只是一个写缓存。上面的代码已经成功写入数据的前提是假设我们调用write()函数,磁盘I/O就立即执行。但是POSIX标准是不保证这个假设的。
尽可能使用数据库系统
如果我们能够获得ACID 四个性质,那么我们增加可靠性方面取得了长远发展。但是这需要很大的编码功劳。为什么重复发明轮子?大多数数据库系统已经有ACID事务了。
可靠性数据存储已经是一个已解决的问题。如果你需要可靠性存储,请使用数据库。很可能,没有几十年的功夫,你自己解决这方面的能力没有那些已经专注这方面好些年的人好。如果你不想安装一个大数据库服务器,那么你可以使用sqlite,它具有ACID事务,很小,免费的,而且它包含在Python的标准库中。
文章本该在这里就结束的,但是还有一些有根有据的原因,就是不使用数据。它们通常是文件格式或者文件位置约束。这两个在数据库系统中都不好控制。理由如下:
我们必须处理其它应用产生的固定格式或者在固定位置的文件,
我们必须为了其它应用的消耗而写文件(和应用了同样的限制条件)
我们的文件必须方便人阅读或者修改。

如果我们自己动手实现可靠的文件更新,那么这里有一些编程技术供参考。下面我将展示四种常见的操作文件更新模式。在那之后,我会讨论采取哪些步骤在每个文件更新模式下满足ACID性质。
文件更新模式
文件可以以多种方式更新,但是我认为至少有四种常见的模式。这四种模式将做为本文剩余部分的基础。
截断-写
这可能是最基本的模式。在下述例子中,假设的域模型代码读数据,执行一些计算,然后以写模式重新打开存在的文件:

代码如下:


with open(filename, 'r') as f:
   model.read(f)
model.process()
with open(filename, 'w') as f:
   model.write(f)

此模式的一个变种以读写模式打开文件(Python中的“加”模式),寻找到开始的位置,显式调用truncate(),重写文件内容。

代码如下:


with open(filename, 'a+') as f:
   f.seek(0)
   model.input(f.read())
   model.compute()
   f.seek(0)
   f.truncate()
   f.write(model.output())

该变种的优势是只打开文件一次,始终保持文件打开。举例来说,这样可以简化加锁。
写-替换
另外一种广泛使用的模式是将新内容写到临时文件,之后替换原始文件:

代码如下:


with tempfile.NamedTemporaryFile(
      'w', dir=os.path.dirname(filename), delete=False) as tf:
   tf.write(model.output())
   tempname = tf.name
os.rename(tempname, filename)

该方法与截断-写方法相比对错误更具有鲁棒性。请看下面对原子性和一致性的讨论。很多应用使用该方法。
这两个模式很常见,以至于linux内核中的ext4文件系统甚至可以自动检测到这些模式,自动修复一些可靠性缺陷。但是不要依赖这一特性:你并不是总是使用ext4,而且管理员可能会关掉这一特性。
追加

第三种模式就是追加新数据到已存在的文件:

代码如下:


with open(filename, 'a') as f:
   f.write(model.output())

这个模式用来写日志文件和其它累积处理数据的任务。从技术上讲,它的显著特点就是极其简单。一个有趣的扩展应用就是常规操作中只通过追加操作更新,然后定期重新整理文件,使之更紧凑。
Spooldir
这里我们将目录做为逻辑数据存储,为每条记录创建新的唯一命名的文件:

代码如下:


with open(unique_filename(), 'w') as f:
   f.write(model.output())

该模式与附加模式一样具有累积的特点。一个巨大的优势是我们可以在文件名中放入少量元数据。举例来说,这可以用于传达处理状态的信息。spooldir模式的一个特别巧妙的实现是maildir格式。maildirs使用附加子目录的命名方案,以可靠的、无锁的方式执行更新操作。md和gocept.filestore库为maildir操作提供了方便的封装。
如果你的文件名生成不能保证唯一的结果,甚至有可能要求文件必须实际上是新的。那么调用具有合适标志的低等级os.open():

代码如下:


fd = os.open(filename, os.O_WRONLY | os.O_CREAT| os.O_EXCL, 0o666)
with os.fdopen(fd, 'w') as f:
   f.write(...)

在以O_EXCL方式打开文件后,我们用os.fdopen将原始的文件描述符转化为普通的Python文件对象。
应用ACID属性到文件更新
下面,我将尝试加强文件更新模式。反过来让我们看看可以做些什么来满足ACID属性。我将会尽可能保持简单,因为我们并不是要写一个完整的数据库系统。请注意本节的材料并不彻底,但是可以为你自己的实验提供一个好的起点。
原子性
写-替换模式提供了原子性,因为底层的os.rename()是原子性的。这意味着在任意给定时间点,进程或者看到旧的文件,或者看到新的文件。该模式对写错误具有天然的鲁棒性:如果写操作触发异常,重命名操作就不会被执行,所有就没有用损坏的新文件覆盖正确的旧文件的风险。
附加模式并不是原子性的,因为有附加不完整记录的风险。但是有个技巧可以使更新具有原子性:为每个写操作标注校验和。之后读日志的时候,忽略所有没有有效校验和的记录。以这种方式,只有完整的记录才会被处理。在下面的例子中,应用做周期性的测量,每次在日志中附加一行JSON记录。我们计算记录的字节表示形式的CRC32校验和,然后附加到同一行:

代码如下:


with open(logfile, 'ab') as f:
    for i in range(3):
        measure = {'timestamp': time.time(), 'value': random.random()}
        record = json.dumps(measure).encode()
        checksum = '{:8x}'.format(zlib.crc32(record)).encode()
        f.write(record + b' ' + checksum + b'\n')

该例子代码通过每次创建随机值模拟测量。

代码如下:


$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.232021160265939} d229d937

想要处理这个日志文件,我们每次读一行记录,分离校验和,与读到的记录比较。

代码如下:


with open(logfile, 'rb') as f:
    for line in f:
        record, checksum = line.strip().rsplit(b' ', 1)
        if checksum.decode() == '{:8x}'.format(zlib.crc32(record)):
            print('read measure: {}'.format(json.loads(record.decode())))
        else:
            print('checksum error for record {}'.format(record))


现在我们通过截断最后一行模拟被截断的写操作:

代码如下:


$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.23202

当读日志的时候,最后不完整的一行被拒绝:

代码如下:


$ read_checksummed_log.py log
read measure: {'timestamp': 1373396987.258189, 'value': 0.9360123151217828}
read measure: {'timestamp': 1373396987.25825, 'value': 0.40429005476999424}
checksum error for record b'{"timestamp": 1373396987.258291, "value":'

添加校验和到日志记录的方法被用于大量应用,包括很多数据库系统。
spooldir中的单个文件也可以在每个文件中添加校验和。另外一个可能更简单的方法是借用写-替换模式:首先将文件写到一边,然后移到最终的位置。设计一个保护正在被消费者处理的文件的命名方案。在下面的例子中,所有以.tmp结尾的文件都会被读取程序忽略,因此在写操作的时候可以安全的使用。

代码如下:


newfile = generate_id()
with open(newfile + '.tmp', 'w') as f:
   f.write(model.output())
os.rename(newfile + '.tmp', newfile)

最后,截断-写是非原子性的。很遗憾我不能提供满足原子性的变种。在执行完截取操作后,文件是空的,还没有新内容写入。如果并发的程序现在读文件或者有异常发生,程序中止,我们既看不久的版本也看不到新的版本。
一致性
我谈论的关于原子性的大部分内容也可以应用到一致性。实际上,原子性更新是内部一致性的前提条件。外部一致性意味着同步更新几个文件。这不容易做到,锁文件可以用来确保读写访问互不干涉。考虑某目录下的文件需要互相保持一致。常用的模式是指定锁文件,用来控制对整个目录的访问。
写程序的例子:

代码如下:


with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
   fcntl.flock(lockfile, fcntl.LOCK_EX)
   model.update(dirname)

读程序的例子:

代码如下:


with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
   fcntl.flock(lockfile, fcntl.LOCK_SH)
   model.readall(dirname)

该方法只有控制所有读程序才生效。因为每次只有一个写程序活动(独占锁阻塞所有共享锁),所有该方法的可扩展性有限。
更进一步,我们可以对整个目录应用写-替换模式。这涉及为每次更新创建新的目录,更新完成后改变符合链接。举例来说,镜像应用维护一个包含压缩包和列出了文件名、文件大小和校验和的索引文件的目录。当上流的镜像更新,仅仅隔离地对压缩包和索引文件进项原子性更新是不够的。相反,我们需要同时提供压缩包和索引文件以免校验和不匹配。为了解决这个问题,我们为每次生成维护一个子目录,然后改变符号链接激活该次生成。

代码如下:


mirror
|-- 483
|   |-- a.tgz
|   |-- b.tgz
|   `-- index.json
|-- 484
|   |-- a.tgz
|   |-- b.tgz
|   |-- c.tgz
|   `-- index.json
`-- current -> 483

新的生成484正在被更新的过程中。当所有压缩包准备好,索引文件更新后,我们可以用一次原子调用os.symlink()来切换current符号链接。其它应用总是或者看到完全旧的或者完全新的生成。读程序需要使用os.chdir()进入current目录,很重要的是不要用完整路径名指定文件。否在当读程序打开current/index.json,然后打开current/a.tgz,但是同时符号链接已经改变时就会出现竞争条件。
隔离性
隔离性意味着对同一文件的并发更新是可串行化的——存在一个串行调度使得实际执行的并行调度返回相同的结果。“真实的”数据库系统使用像MVCC这种高级技术维护可串行性,同时允许高等级的可并行性。回到我们的场景,我们最后使用加锁来串行文件更新。
对截断-写更新进行加锁是容易的。仅仅在所有文件操作前获取一个独占锁就可以。下面的例子代码从文件中读取一个整数,然后递增,最后更新文件:

代码如下:


def update():
   with open(filename, 'r+') as f:
      fcntl.flock(f, fcntl.LOCK_EX)
      n = int(f.read())
      n += 1
      f.seek(0)
      f.truncate()
      f.write('{}\n'.format(n))

使用写-替换模式加锁更新就有点儿麻烦啦。像 截断-写那样使用锁可能导致更新冲突。某个幼稚的实现可能看起来像这样

代码如下:


def update():
   with open(filename) as f:
      fcntl.flock(f, fcntl.LOCK_EX)
      n = int(f.read())
      n += 1
      with tempfile.NamedTemporaryFile(
            'w', dir=os.path.dirname(filename), delete=False) as tf:
         tf.write('{}\n'.format(n))
         tempname = tf.name
      os.rename(tempname, filename)

这段代码有什么问题呢?设想两个进程竞争更新某个文件。第一个进程运行在前面,但是第二个进程阻塞在fcntl.flock()调用。当第一个进程替换了文件,释放了锁,现在在第二个进程中打开的文件描述符指向了一个包含旧内容的“幽灵”文件(任意路径名都不可达)。想要避免这个冲突,我们必须检查打开的文件是否与fcntl.flock()返回的相同。所以我写了一个新的LockedOpen上下文管理器来替换内建的open上下文。来确保我们实际打开了正确的文件:

代码如下:


class LockedOpen(object):

    def __init__(self, filename, *args, **kwargs):
        self.filename = filename
        self.open_args = args
        self.open_kwargs = kwargs
        self.fileobj = None

    def __enter__(self):
        f = open(self.filename, *self.open_args, **self.open_kwargs)
        while True:
            fcntl.flock(f, fcntl.LOCK_EX)
            fnew = open(self.filename, *self.open_args, **self.open_kwargs)
            if os.path.sameopenfile(f.fileno(), fnew.fileno()):
                fnew.close()
                break
            else:
                f.close()
                f = fnew
        self.fileobj = f
        return f

    def __exit__(self, _exc_type, _exc_value, _traceback):
        self.fileobj.close()

给追加更新上锁如同给截断-写更新上锁一样简单:需要一个排他锁,然后追加就完成了。需要长期运行的会将文件长久的打开的进程,可以在更新时释放锁,让其它进入。
spooldir模式有个很优美的性质就是它不需要任何锁。此外,你建立在使用灵活的命名模式和一个健壮的文件名分代。邮件目录规范就是一个spooldir模式的好例子。它可以很容易的适应其它情况,不仅仅是处理邮件。
持久性
持久性有点特殊,因为它不仅依赖于应用,也与OS和硬件配置有关。理论上来说,我们可以假定,如果数据没有到达持久存储,os.fsync()或os.fdatasync()调用就没有返回结果。在实际情况中,我们有可能会遇到几个问题:我们可能会面对不完整的fsync实现,或者糟糕的磁盘控制器配置,它们都无法提供任何持久化的保证。有一个来自 MySQL 开发者 的讨论对哪里会发生错误进行了详尽的讨论。有些像PostgreSQL 之类的数据库系统,甚至提供了持久化机制的选择 ,以便管理员在运行时刻选择最佳的一个。然而不走运的人只能使用os.fsync(),并期待它可以被正确的实现。
通过截断-写模式,在结束写操作以后关闭文件以前,我们需要发送一个同步信号。注意通常这还牵涉到另一个层次的写缓存。glibc 缓存 甚至会在写操作传递到内核以前,在进程内部拦住它。同样为了得到空的glibc缓存,我们需要在同步以前对它flush():

代码如下:


with open(filename, 'w') as f:
   model.write(f)
   f.flush()
   os.fdatasync(f)

要不,你也可以带参数-u调用Python,以此为所有的文件I/O获得未缓冲的写。
大多数时候相较os.fsync()我更喜欢os.fdatasync(),以此避免同步元数据的更新(所有权、大小、mtime…)。元数据的更新可最终导致磁盘I/O搜索操作,这会使整个过程慢不少。
对写-替换风格更新使用同样的技巧只是成功了一半。我们得确保在代替旧文件之前,新写入文件的内容已经写入了非易失性存储器上了,但是替换操作怎么办?我们不能保证那个目录更新是否执行的刚刚好。在网络上有很多关于怎么让同步目录更新的长篇大论。但是在我们这种情况,旧文件和新文件都在同一个目录下,我们可以使用简单的解决方案来逃避这个这题。

代码如下:


os.rename(tempname, filename)
dirfd = os.open(os.path.dirname(filename), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)

我们调用底层的os.open()来打开目录(Python自带的open()方法不支持打开目录),然后在目录文件描述符上执行os.fsync()。
对待追加更新和我以及说过的截断-写是相似的。
spooldir模式与写-替换模式同样的目录同步问题。幸运地是,可以使用同样的解决方案:第一步同步文件,然后同步目录。

总结
这使可靠的更新文件成为可能。我已经演示了满足ACID的四大性质。这些展示的实例代码充当一个工具箱。掌握这编程技术最大的满足你的需求。有时,你并不需要满足所有的ACID性质,可能仅仅需要一到两个。我希望这篇文章可以帮助你去做已充分了解的决定,什么该去实现以及什么该舍弃。

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn