Heim  >  Artikel  >  Backend-Entwicklung  >  Bringen Sie Ihnen bei, eine Datenbank mit mehr als 100 Zeilen zu schreiben

Bringen Sie Ihnen bei, eine Datenbank mit mehr als 100 Zeilen zu schreiben

高洛峰
高洛峰Original
2016-10-18 14:40:201330Durchsuche

In diesem Artikel wird eine einfache Datenbank vorgestellt, die für chinesische IT-Veteranen geschrieben wurde. Sie ist nicht so leistungsfähig wie die von uns verwendete Datenbank, aber es lohnt sich, daraus zu lernen. Es kann in bestimmten Umgebungen verwendet werden und ist flexibler und bequemer.

Der Name der Datenbank ist WawaDB, die in Python implementiert ist. Dies zeigt, dass Python sehr mächtig ist!

Einführung

Die Anforderungen für die Protokollierung lauten im Allgemeinen wie folgt:

Nur ​​anhängen, nicht ändern, in chronologischer Reihenfolge schreiben; Bei einer kleinen Lesemenge fragt die Abfrage im Allgemeinen die Daten eines Zeitraums ab.

Die feste Sammlung von MongoDB kann diese Anforderung sehr gut erfüllen, aber MongoDB belegt relativ viel Speicher, was das Gefühl vermittelt, eine zu erstellen Aufregung aus einem Maulwurfshügel.

Die Idee von WawaDB besteht darin, jedes Mal, wenn 1000 Protokolle geschrieben werden, die aktuelle Zeit und den Offset der Protokolldatei in einer Indexdatei aufzuzeichnen.

Wenn Sie dann das Protokoll nach Zeit abfragen, laden Sie zuerst den Index in den Speicher, ermitteln Sie mit der Dichotomiemethode den Versatz des Zeitpunkts, öffnen Sie dann die Protokolldatei und suchen Sie nach dem angegebenen Speicherort Die vom Benutzer benötigten Daten können schnell lokalisiert und gelesen werden, ohne die gesamte Protokolldatei zu durchlaufen.

Leistung

Core 2 P8400, 2,26 GHz, 2G Speicher, 32 Bit Win7

Schreibtest:

Simulieren Sie das Schreiben von 10.000 Daten in 1 Minute Es wurden insgesamt 5 Stunden Daten geschrieben, 3 Millionen Datenelemente eingefügt, jedes Datenelement bestand aus 54 Zeichen und es dauerte 2 Minuten und 51 Sekunden


Lesetest: Lesen Sie bestimmte Protokolle, die eine bestimmte Teilzeichenfolge enthalten, im Zeitraum

Datenbereichsdurchlauf, Datenvolumen, Ergebniszählzeit (Sekunden)

5 Stunden 3 Millionen 604 6,6

2 Stunden 1,2 Millionen 225 2,7

1 Stunde 600.000 96 1,3

30 Minuten 300.000 44 0,6

Index

Indizieren Sie nur die im Protokoll aufgezeichnete Zeit, als In der Einleitung heißt es grob: In Bezug auf die Implementierung des Index ist die binäre Suche definitiv nicht so effizient wie B Tree, aber im Allgemeinen unterscheidet sie sich nicht um eine Größenordnung und die Implementierung ist sehr einfach.

Da es sich um einen spärlichen Index handelt, verfügt nicht jedes Protokoll über einen Index zum Aufzeichnen seines Offsets. Wenn Sie also Daten lesen, müssen Sie einige weitere Daten vorwärts lesen, um Lesefehler zu vermeiden, und warten, bis Sie gelesen haben, was Sie wirklich lesen Die Daten werden dann tatsächlich an den Benutzer zurückgegeben.

Wie unten gezeigt, möchte der Benutzer beispielsweise die Protokolle von 25 bis 43 lesen, die Dichotomiemethode verwenden, um 25 zu finden, und den Punkt finden, an dem sich 30 befindet,

Index : 0 10 20 30 40 50 Protokoll: |..........|..........|..........|....... ...|.......... |>>>>a = [0, 10, 20, 30, 40, 50]>>>>bisect.bisect_left(a, 35)>>>> 3>>>>a[3]>>>>30>>>>bisect_left(a, 43)>>>5>>>a[5]>>50

Also müssen wir Gehen Sie ein wenig vorwärts und beginnen Sie mit dem Lesen des Protokolls ab 20 (das Häkchen vor 30). 21, 22, 23 und 24 werden gelesen und weggeworfen, weil sie kleiner als 25 sind. Nach dem Lesen von 25, 26, 27,..., Sie werden an den Benutzer zurückgegeben

Bis 40 lesen (der vorherige Tick von 50) Dann muss festgestellt werden, ob die aktuellen Daten größer als 43 sind. Wenn sie größer als 43 sind (Rückgabedaten im vollständig geöffneten Zustand). Bereich), ist es notwendig, mit dem Lesen aufzuhören.

Insgesamt haben wir nur einen kleinen Teil der großen Datei bearbeitet, um die vom Benutzer gewünschten Daten zu erhalten.

Puffer

Um eine große Anzahl von Festplattenschreibvorgängen beim Schreiben von Protokollen zu reduzieren, wird der Puffer beim Anhängen des Protokolls auf 10 KB eingestellt. Der Systemstandard sollte 4 KB betragen.

Um die Effizienz beim Lesen von Protokollen zu verbessern, ist der Lesepuffer ebenfalls auf 10 KB eingestellt und muss entsprechend der Größe Ihres Protokolls angepasst werden.

Das Lesen und Schreiben des Index ist auf einen Zeilenpuffer eingestellt, und jede vollständige Zeile muss auf die Festplatte geleert werden, um zu verhindern, dass unvollständige Indexzeilen gelesen werden (tatsächlich hat die Praxis dies auch bei einer Zeile bewiesen). Puffer gesetzt, halbgelesene Zeilen können weiterhin gelesen werden) OK).

Abfrage

Was? Um SQL zu unterstützen, machen Sie keine Probleme mehr. Wie können 100 Codezeilen SQL unterstützen?

Jetzt wird die Abfrage direkt in einem Lambda-Ausdruck übergeben. Wenn das System die Datenzeilen innerhalb des angegebenen Zeitbereichs durchläuft, wird sie nur dann an den Benutzer zurückgegeben, wenn die Lambada-Bedingungen des Benutzers erfüllt sind.

Natürlich werden dadurch viele Daten gelesen, die der Benutzer nicht benötigt, und jede Zeile muss mit einem Lambda-Ausdruck berechnet werden, aber das geht nicht, Einfachheit ist schön.

In der Vergangenheit habe ich eine Bedingung, die abgefragt werden musste, die Protokollzeit und den Offset der Protokolldatei im Index aufgezeichnet, damit ich den Offset finden konnte, der die Bedingungen aus dem Index erfüllt, und dann jedes Stück davon Daten würden wie folgt aussehen: Einmal in der Protokolldatei suchen. Dies hat nur einen Vorteil: Die gelesene Datenmenge ist geringer, es gibt jedoch zwei Nachteile:

Die Indexdatei ist sehr groß und lässt sich nur schwer in den Speicher laden

Alle Wenn es gelesen wird, muss es zuerst gelesen werden. Es scheint, dass der Puffer nicht verwendet wird. Es ist sehr langsam, vier- oder fünfmal langsamer als das kontinuierliche Lesen eines Datensegments und das Filtern mit Lambda

Schreiben

Wie ich bereits sagte, hängen Sie die Daten nur an, ändern Sie sie nicht, und am Anfang jeder Protokollzeile steht ein Zeitstempel.

Multithreading


Abfragedaten können von mehreren Threads gleichzeitig abgefragt werden. Bei jeder Abfrage wird ein neuer Protokolldateideskriptor geöffnet mehr parallel. Die Lesevorgänge werden nicht kämpfen.

Obwohl es sich beim Schreiben nur um einen Anhängevorgang handelt, sind wir nicht sicher, ob es sicher ist, die Datei mit mehreren Threads anzuhängen. Daher wird empfohlen, zum Schreiben eine Warteschlange und einen dedizierten Thread zu verwenden.

Sperren

Es gibt keine Sperren.

Sortieren

Standardmäßig sind die abgefragten Daten in chronologischer Reihenfolge angeordnet. Wenn Sie eine andere Sortierung benötigen, können Sie sie nach dem Speichern in den Speicher sortieren es, wie auch immer Sie wollen.


Mehr als 100 Zeilen Datenbankcode

# -*- coding:utf-8 -*-
import os
import time
import bisect
import itertools
from datetime import datetime
import logging
  
default_data_dir = './data/'
default_write_buffer_size = 1024*10
default_read_buffer_size = 1024*10
default_index_interval = 1000
  
def ensure_data_dir():
    if not os.path.exists(default_data_dir):
        os.makedirs(default_data_dir)
  
def init():
    ensure_data_dir()
  
class WawaIndex:
    def __init__(self, index_name):
        self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1)
        self.indexes, self.offsets, self.index_count = [], [], 0
        self.__load_index()
  
    def __update_index(self, key, offset):
        self.indexes.append(key)
        self.offsets.append(offset)
  
    def __load_index(self):
        self.fp_index.seek(0)
        for line in self.fp_index:
            try:
                key, offset  = line.split()
                self.__update_index(key, offset)
            except ValueError: # 索引如果没有flush的话,可能读到有半行的数据
                pass
  
    def append_index(self, key, offset):
        self.index_count += 1
        if self.index_count % default_index_interval == 0:
            self.__update_index(key, offset)
            self.fp_index.write('%s %s %s' % (key, offset, os.linesep))
  
    def get_offsets(self, begin_key, end_key):
        left = bisect.bisect_left(self.indexes, str(begin_key))
        right = bisect.bisect_left(self.indexes, str(end_key))
        left, right = left - 1, right - 1
        if left < 0: left = 0
        if right < 0: right = 0
        if right > len(self.indexes) - 1: right = len(self.indexes) - 1
        logging.debug(&#39;get_index_range:%s %s %s %s %s %s&#39;, self.indexes[0], self.indexes[-1], begin_key, end_key, left, right)
        return self.offsets[left], self.offsets[right]
  
  
class WawaDB:
    def __init__(self, db_name):
        self.db_name = db_name
        self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + &#39;.db&#39;), &#39;a&#39;, default_write_buffer_size)
        self.index = WawaIndex(db_name)
  
    def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset):
        fp_data = open(os.path.join(default_data_dir, self.db_name + &#39;.db&#39;), &#39;r&#39;, default_read_buffer_size)
        fp_data.seek(int(begin_offset))
          
        line = fp_data.readline()
        find_real_begin_offset = False
        will_read_len, read_len = int(end_offset) - int(begin_offset), 0
        while line:
            read_len += len(line)
            if (not find_real_begin_offset) and  (line < str(begin_key)):
                line = fp_data.readline()
                continue
            find_real_begin_offset = True
            if (read_len >= will_read_len) and (line > str(end_key)): break
            yield line.rstrip(&#39;\r\n&#39;)
            line = fp_data.readline()
  
    def append_data(self, data, record_time=datetime.now()):
        def check_args():
            if not data:
                raise ValueError(&#39;data is null&#39;)
            if not isinstance(data, basestring):
                raise ValueError(&#39;data is not string&#39;)
            if data.find(&#39;\r&#39;) != -1 or data.find(&#39;\n&#39;) != -1:
                raise ValueError(&#39;data contains linesep&#39;)
  
        check_args()
          
        record_time = time.mktime(record_time.timetuple())
        data = &#39;%s %s %s&#39; % (record_time, data, os.linesep)
        offset = self.fp_data_for_append.tell()
        self.fp_data_for_append.write(data)
        self.index.append_index(record_time, offset)
  
    def get_data(self, begin_time, end_time, data_filter=None):
        def check_args():
            if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)):
                raise ValueError(&#39;begin_time or end_time is not datetime&#39;)
  
        check_args()
  
        begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple())
        begin_offset, end_offset = self.index.get_offsets(begin_time, end_time)
  
        for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset):
            if data_filter:
                if data_filter(data):
                    yield data
            else:
                yield data
  
def test():
    from datetime import datetime, timedelta
    import uuid, random
    logging.getLogger().setLevel(logging.NOTSET)
  
    def time_test(test_name):
        def inner(f):
            def inner2(*args, **kargs):
                start_time = datetime.now()
                result = f(*args, **kargs)
                print &#39;%s take time:%s&#39; % (test_name, (datetime.now() - start_time))
                return result
            return inner2
        return inner
  
    @time_test(&#39;gen_test_data&#39;)   
    def gen_test_data(db):
        now = datetime.now()
        begin_time = now - timedelta(hours=5)
        while begin_time < now:
            print begin_time
            for i in range(10000):
                db.append_data(str(random.randint(1,10000))+ &#39; &#39; +str(uuid.uuid1()), begin_time)
            begin_time += timedelta(minutes=1)
      
    @time_test(&#39;test_get_data&#39;)   
    def test_get_data(db):
        begin_time = datetime.now() - timedelta(hours=3)
        end_time = begin_time + timedelta(minutes=120)
        results = list(db.get_data(begin_time, end_time, lambda x: x.find(&#39;1024&#39;) != -1))
        print &#39;test_get_data get %s results&#39; % len(results)
  
    @time_test(&#39;get_db&#39;)   
    def get_db():
        return WawaDB(&#39;test&#39;)
  
    if not os.path.exists(&#39;./data/test.db&#39;):
        db = get_db()
        gen_test_data(db)
        #db.index.fp_index.flush()
    
    db = get_db()
    test_get_data(db)
  
init()
  
if __name__ == &#39;__main__&#39;:
    test()


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn