Home >Backend Development >Python Tutorial >Teach you to write a database with more than 100 lines

Teach you to write a database with more than 100 lines

高洛峰
高洛峰Original
2016-10-18 14:40:201383browse

This article introduces a simple database written for Chinese IT veterans. It is not as powerful as the database we use, but it is worth learning from. It can be used in specific environments and is more flexible and convenient.

The name of the database is WawaDB, which is implemented in python. This shows that python is very powerful!

Introduction

The requirements for logging are generally as follows:

Only append, not modify, writes are written in chronological order;

A large amount of writing, a small amount of reading, and the query generally queries the data of a time period;

MongoDB The fixed collection of MongoDB satisfies this requirement very well, but MongoDB occupies a relatively large amount of memory, which makes it feel like making a fuss out of a molehill.

The idea of ​​WawaDB is to record the current time and the offset of the log file in an index file every time 1,000 logs are written.

Then when querying the log by time, first load the index into the memory, use the dichotomy method to find the offset of the time point, and then open the log file and seek to the specified location, so that the data the user needs can be quickly located and read. fetch without traversing the entire log file.

Performance

Core 2 P8400, 2.26GHZ, 2G memory, 32 bit win7

Write test:

Simulate writing 10,000 pieces of data in 1 minute, write 5 hours of data in total, insert 3 million pieces of data, Each piece of data has 54 characters, and it takes 2 minutes and 51 seconds


Reading test: Read logs containing a certain substring within a specified time period

Data range traversal data volume result number time (seconds)

5 hours 3 million 604 6.6

2 hours 1.2 million 225 2.7

1 hour 600,096 1.3

30 minutes 300,044 0.6

Index

Only index the time recorded in the log. The introduction probably mentioned the index. In implementation, binary search is definitely not as efficient as B Tree, but in general it is not an order of magnitude worse, and the implementation is very simple.

Because it is a sparse index, not every log has an index to record its offset. Therefore, when reading data, you need to read more data forward to prevent missed reading, and wait until you read the actual required data. Return data to the user.

As shown below, for example, if the user wants to read the logs from 25 to 43, use the dichotomy method to find 25, and find the point where 30 is located,

Index: 0 10 20 30 40 50 Log: |..... ..|..........|..........|..........|..........|>>>a = [0 , 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>>>bisect.bisect_left(a, 43)>> >5>>>a[5]>>50

So we have to go forward a little and start reading the log from 20 (the tick before 30). After reading 21, 22, 23, 24 because it is larger than 25 Small, so throw it away. After reading 25, 26, 27,..., it will be returned to the user

After reading 40 (the previous scale of 50), it is necessary to determine whether the current data is greater than 43. If it is greater than 43 (return data in the fully open range), it is time to stop reading.

Overall, we only operated a small part of the large file and got the data the user wanted.

Buffer

In order to reduce a large number of disk writes when writing logs, the buffer is set to 10k when indexing in the append log. The system default should be 4k.

Similarly, in order to improve the efficiency of reading logs, the read buffer is also set to 10k, and it also needs to be adjusted appropriately according to the size of your logs.

The read and write of the index is set to a row buffer, and every full row must be flushed to the disk to prevent incomplete index rows from being read (in fact, practice has proven that even with a row buffer set, half-drawn rows can still be read).

Inquiry

What? To support SQL, stop making trouble, how can 100 lines of code support SQL?

Now the query is directly passed in a lambda expression. When the system traverses the data rows within the specified time range, it will be returned to the user only if the user's lambda conditions are met.

Of course, this will read a lot of data that the user does not need, and each line must be calculated with a lambda expression, but there is no way, simplicity is beautiful.

In the past, I recorded a query condition, log time, and log file offset in the index, so that the offset that meets the conditions can be found from the index, and then each piece of data is searched once in the log file. read once. This has only one advantage, which is that the amount of data read is less, but there are two disadvantages:

The index file is very large and it is inconvenient to load into the memory.

You have to seek first every time you read it, and it seems that the buffer is not used. , extremely slow, four or five times slower than continuously reading a segment of data and using lambda filtering

Writing

As mentioned before, it only appends and does not modify the data, and the front of each log line is a timestamp.

Multi-threading


Query data can be queried by multiple threads at the same time. Each query will open a new log file descriptor, so multiple parallel reads will not fight.

For writing, although it is just an append operation, it is not confirmed whether it is safe for multiple threads to append the file, so it is recommended to use a queue and a dedicated thread for writing.

Locks

There are no locks.

Sort

By default, the queried data is arranged in chronological order. If you need other sorting, you can sort it using python's sorted function after getting it into the memory. You can sort it however you want.


More than 100 lines of database code

# -*- coding:utf-8 -*-
import os
import time
import bisect
import itertools
from datetime import datetime
import logging
  
default_data_dir = './data/'
default_write_buffer_size = 1024*10
default_read_buffer_size = 1024*10
default_index_interval = 1000
  
def ensure_data_dir():
    if not os.path.exists(default_data_dir):
        os.makedirs(default_data_dir)
  
def init():
    ensure_data_dir()
  
class WawaIndex:
    def __init__(self, index_name):
        self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1)
        self.indexes, self.offsets, self.index_count = [], [], 0
        self.__load_index()
  
    def __update_index(self, key, offset):
        self.indexes.append(key)
        self.offsets.append(offset)
  
    def __load_index(self):
        self.fp_index.seek(0)
        for line in self.fp_index:
            try:
                key, offset  = line.split()
                self.__update_index(key, offset)
            except ValueError: # 索引如果没有flush的话,可能读到有半行的数据
                pass
  
    def append_index(self, key, offset):
        self.index_count += 1
        if self.index_count % default_index_interval == 0:
            self.__update_index(key, offset)
            self.fp_index.write('%s %s %s' % (key, offset, os.linesep))
  
    def get_offsets(self, begin_key, end_key):
        left = bisect.bisect_left(self.indexes, str(begin_key))
        right = bisect.bisect_left(self.indexes, str(end_key))
        left, right = left - 1, right - 1
        if left < 0: left = 0
        if right < 0: right = 0
        if right > len(self.indexes) - 1: right = len(self.indexes) - 1
        logging.debug(&#39;get_index_range:%s %s %s %s %s %s&#39;, self.indexes[0], self.indexes[-1], begin_key, end_key, left, right)
        return self.offsets[left], self.offsets[right]
  
  
class WawaDB:
    def __init__(self, db_name):
        self.db_name = db_name
        self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + &#39;.db&#39;), &#39;a&#39;, default_write_buffer_size)
        self.index = WawaIndex(db_name)
  
    def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset):
        fp_data = open(os.path.join(default_data_dir, self.db_name + &#39;.db&#39;), &#39;r&#39;, default_read_buffer_size)
        fp_data.seek(int(begin_offset))
          
        line = fp_data.readline()
        find_real_begin_offset = False
        will_read_len, read_len = int(end_offset) - int(begin_offset), 0
        while line:
            read_len += len(line)
            if (not find_real_begin_offset) and  (line < str(begin_key)):
                line = fp_data.readline()
                continue
            find_real_begin_offset = True
            if (read_len >= will_read_len) and (line > str(end_key)): break
            yield line.rstrip(&#39;\r\n&#39;)
            line = fp_data.readline()
  
    def append_data(self, data, record_time=datetime.now()):
        def check_args():
            if not data:
                raise ValueError(&#39;data is null&#39;)
            if not isinstance(data, basestring):
                raise ValueError(&#39;data is not string&#39;)
            if data.find(&#39;\r&#39;) != -1 or data.find(&#39;\n&#39;) != -1:
                raise ValueError(&#39;data contains linesep&#39;)
  
        check_args()
          
        record_time = time.mktime(record_time.timetuple())
        data = &#39;%s %s %s&#39; % (record_time, data, os.linesep)
        offset = self.fp_data_for_append.tell()
        self.fp_data_for_append.write(data)
        self.index.append_index(record_time, offset)
  
    def get_data(self, begin_time, end_time, data_filter=None):
        def check_args():
            if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)):
                raise ValueError(&#39;begin_time or end_time is not datetime&#39;)
  
        check_args()
  
        begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple())
        begin_offset, end_offset = self.index.get_offsets(begin_time, end_time)
  
        for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset):
            if data_filter:
                if data_filter(data):
                    yield data
            else:
                yield data
  
def test():
    from datetime import datetime, timedelta
    import uuid, random
    logging.getLogger().setLevel(logging.NOTSET)
  
    def time_test(test_name):
        def inner(f):
            def inner2(*args, **kargs):
                start_time = datetime.now()
                result = f(*args, **kargs)
                print &#39;%s take time:%s&#39; % (test_name, (datetime.now() - start_time))
                return result
            return inner2
        return inner
  
    @time_test(&#39;gen_test_data&#39;)   
    def gen_test_data(db):
        now = datetime.now()
        begin_time = now - timedelta(hours=5)
        while begin_time < now:
            print begin_time
            for i in range(10000):
                db.append_data(str(random.randint(1,10000))+ &#39; &#39; +str(uuid.uuid1()), begin_time)
            begin_time += timedelta(minutes=1)
      
    @time_test(&#39;test_get_data&#39;)   
    def test_get_data(db):
        begin_time = datetime.now() - timedelta(hours=3)
        end_time = begin_time + timedelta(minutes=120)
        results = list(db.get_data(begin_time, end_time, lambda x: x.find(&#39;1024&#39;) != -1))
        print &#39;test_get_data get %s results&#39; % len(results)
  
    @time_test(&#39;get_db&#39;)   
    def get_db():
        return WawaDB(&#39;test&#39;)
  
    if not os.path.exists(&#39;./data/test.db&#39;):
        db = get_db()
        gen_test_data(db)
        #db.index.fp_index.flush()
    
    db = get_db()
    test_get_data(db)
  
init()
  
if __name__ == &#39;__main__&#39;:
    test()


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn