Die Replikation von MongoDB speichert Schreibvorgänge über ein Protokoll. Der folgende Artikel führt Sie hauptsächlich in die relevanten Informationen zur Verwendung des Oplog-Mechanismus in MongoDB ein, um eine Quasi-Echtzeit-Datenoperationsüberwachung zu erreichen darauf verweisen kann, werfen wir einen Blick unten.
Vorwort
In letzter Zeit besteht ein Bedarf, neu eingefügte Daten in MongoDB in Echtzeit abzurufen, und das Einfügeprogramm selbst verfügt bereits über einen Satz Daher ist es unpraktisch, verwandte Programme direkt im Einfügeprogramm zu schreiben. Die meisten herkömmlichen Datenbanken verfügen über diesen Trigger-Mechanismus, aber Mongo verfügt nicht über entsprechende Funktionen (vielleicht weiß ich es auch nicht). viel, bitte Korrektur), natürlich gibt es noch einen weiteren Punkt, der in Python implementiert werden muss, also habe ich eine entsprechende Implementierungsmethode gesammelt und kompiliert.
1. Einführung
Zunächst kann man davon ausgehen, dass diese Anforderung tatsächlich dem Master-Slave sehr ähnlich ist Daher kann die Hauptdatenbank synchronisiert werden, da es bestimmte Kontrollindikatoren gibt. Wir wissen, dass MongoDB zwar keine vorgefertigten Trigger hat, aber eine Master-Slave-Sicherung realisieren kann. Slave-Backup-Mechanismus.
2. OPLOG
Zuerst müssen Sie den Mongod-Daemon im Master-Modus öffnen, verwenden Sie –master in der Befehlszeile , oder KonfigurationsdateiFügen Sie den Hauptschlüssel zu true hinzu.
Zu diesem Zeitpunkt können wir das neue Sammlungs-Oplog in der lokalen Systembibliothek von Mongo sehen. Zu diesem Zeitpunkt werden die Oplog-Informationen in oplog.$main
gespeichert Mongo existiert, es wird einige Slave-Informationen geben, da wir hier keine Master-Slave-Synchronisation durchführen, diese Sätze existieren nicht.
Werfen wir einen Blick auf die Oplog-Struktur:
"ts" : Timestamp(6417682881216249, 1), 时间戳 "h" : NumberLong(0), 长度 "v" : 2, "op" : "n", 操作类型 "ns" : "", 操作的库和集合 "o2" : "_id" update条件 "o" : {} 操作值,即document
Hier erforderlich. Kennen Sie mehrere Attribute von op:
insert,'i' update, 'u' remove(delete), 'd' cmd, 'c' noop, 'n' 空操作
Wie aus den obigen Informationen ersichtlich ist, müssen wir ts zum Vergleich nur kontinuierlich lesen, und Dann basierend auf op Sie können feststellen, welcher Vorgang gerade ausgeführt wird. Dies entspricht der Verwendung eines Programms zum Implementieren eines Empfangsendes aus der Datenbank.
3. CODE
Ich habe die Implementierung anderer Leute auf Github gefunden, aber die Funktionsbibliothek ist zu alt, also nehmen Sie Änderungen vor über seine Arbeit.
Github-Adresse: github.com/RedBeard0531/mongo-oplog-watcher
mongo_oplog_watcher.py lautet wie folgt:
#!/usr/bin/python import pymongo import re import time from pprint import pprint # pretty printer from pymongo.errors import AutoReconnect class OplogWatcher(object): def init(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True): if collection is not None: if db is None: raise ValueError('must specify db if you specify a collection') self._ns_filter = db + '.' + collection elif db is not None: self._ns_filter = re.compile(r'^%s\.' % db) else: self._ns_filter = None self.poll_time = poll_time self.connection = connection or pymongo.Connection() if start_now: self.start() @staticmethod def get_id(op): id = None o2 = op.get('o2') if o2 is not None: id = o2.get('_id') if id is None: id = op['o'].get('_id') return id def start(self): oplog = self.connection.local['oplog.$main'] ts = oplog.find().sort('$natural', -1)[0]['ts'] while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter, tailable=True) while True: for op in cursor: ts = op['ts'] id = self.get_id(op) self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time) def all_with_noop(self, ns, ts, op, id, raw): if op == 'n': self.noop(ts=ts) else: self.all(ns=ns, ts=ts, op=op, id=id, raw=raw) def all(self, ns, ts, op, id, raw): if op == 'i': self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw) elif op == 'u': self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw) elif op == 'd': self.delete(ns=ns, ts=ts, id=id, raw=raw) elif op == 'c': self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw) elif op == 'db': self.db_declare(ns=ns, ts=ts, raw=raw) def noop(self, ts): pass def insert(self, ns, ts, id, obj, raw, **kw): pass def update(self, ns, ts, id, mod, raw, **kw): pass def delete(self, ns, ts, id, raw, **kw): pass def command(self, ns, ts, cmd, raw, **kw): pass def db_declare(self, ns, ts, **kw): pass class OplogPrinter(OplogWatcher): def all(self, **kw): pprint (kw) print #newline if name == 'main': OplogPrinter()
Erstens, Implementieren Sie eine Initialisierung der Datenbank und stellen Sie eine Verzögerungszeit ein (Quasi-Echtzeit):
self.poll_time = poll_time self.connection = connection or pymongo.MongoClient()
Die Hauptfunktion ist start()
, die einen Zeitvergleich implementiert und führt entsprechende Feldverarbeitung aus:
def start(self): oplog = self.connection.local['oplog.$main'] #读取之前提到的库 ts = oplog.find().sort('$natural', -1)[0]['ts'] #获取一个时间边际 while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter) #对此时间之后的进行处理 while True: for op in cursor: ts = op['ts'] id = self.get_id(op) self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) #可以指定处理插入监控,更新监控或者删除监控等 time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time)
Schleife diese Startfunktion, und Sie können die entsprechende Überwachungs- und Verarbeitungslogik in all_with_noop schreiben.
Auf diese Weise kann eine einfache Quasi-Echtzeit-MongoDatenbankoperationMonitor implementiert werden. Im nächsten Schritt kann das neu eingegebene Programm in Verbindung mit anderen Operationen entsprechend abgearbeitet werden.
Das obige ist der detaillierte Inhalt vonTeilen Sie ein Beispiel für die Datenüberwachung mithilfe des Oplog-Mechanismus in MongoDB. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!