MongoDB's Replication stores write operations through a log. This log is called oplog. The following article mainly introduces you to the relevant information on using the oplog mechanism in MongoDB to achieve quasi-real-time data operation monitoring. What is needed Friends can refer to it, let’s take a look below.
Preface
Recently there is a need to obtain newly inserted data into MongoDB in real time, and the insertion program itself already has a set of processing logic , so it is inconvenient to write related programs directly in the insertion program. Most traditional databases come with this trigger mechanism, but Mongo does not have related functions to use (maybe I don’t know too much, please Correction), of course, there is another point that needs to be implemented in python, so I collected and compiled a corresponding implementation method.
1. Introduction
#First of all, it can be thought that this requirement is actually very similar to the master-slave backup mechanism of the database. Therefore, the main database can be synchronized because there are certain indicators for control. We know that although MongoDB does not have ready-made triggers, it can realize master-slave backup, so we start with its master-slave backup mechanism.
2. OPLOG
First of all, you need to open the mongod daemon in master mode. Use the command line –master, or Configuration fileAdd the master key to true.
At this time, we can see the new collection-oplog in the local system library of Mongo. At this time, the oplog information will be stored in oplog.$main
. If this If Mongo exists as a slave database, there will also be some slave information. Since we are not master-slave synchronization here, these sets do not exist.
Let’s take a look at the oplog structure:
"ts" : Timestamp(6417682881216249, 1), 时间戳 "h" : NumberLong(0), 长度 "v" : 2, "op" : "n", 操作类型 "ns" : "", 操作的库和集合 "o2" : "_id" update条件 "o" : {} 操作值,即document
You need to know the op here Several attributes:
insert,'i' update, 'u' remove(delete), 'd' cmd, 'c' noop, 'n' 空操作
As can be seen from the above information, we only need to continuously read ts for comparison, and then judge the current situation based on the op What operation occurs is equivalent to using a program to implement a receiving end from the database.
3. CODE
I found someone else’s implementation on Github, but its function library is too old, so Make modifications based on his work.
Github address: github.com/RedBeard0531/mongo-oplog-watcher
mongo_oplog_watcher.py is as follows:
#!/usr/bin/python import pymongo import re import time from pprint import pprint # pretty printer from pymongo.errors import AutoReconnect class OplogWatcher(object): def init(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True): if collection is not None: if db is None: raise ValueError('must specify db if you specify a collection') self._ns_filter = db + '.' + collection elif db is not None: self._ns_filter = re.compile(r'^%s\.' % db) else: self._ns_filter = None self.poll_time = poll_time self.connection = connection or pymongo.Connection() if start_now: self.start() @staticmethod def get_id(op): id = None o2 = op.get('o2') if o2 is not None: id = o2.get('_id') if id is None: id = op['o'].get('_id') return id def start(self): oplog = self.connection.local['oplog.$main'] ts = oplog.find().sort('$natural', -1)[0]['ts'] while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter, tailable=True) while True: for op in cursor: ts = op['ts'] id = self.get_id(op) self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time) def all_with_noop(self, ns, ts, op, id, raw): if op == 'n': self.noop(ts=ts) else: self.all(ns=ns, ts=ts, op=op, id=id, raw=raw) def all(self, ns, ts, op, id, raw): if op == 'i': self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw) elif op == 'u': self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw) elif op == 'd': self.delete(ns=ns, ts=ts, id=id, raw=raw) elif op == 'c': self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw) elif op == 'db': self.db_declare(ns=ns, ts=ts, raw=raw) def noop(self, ts): pass def insert(self, ns, ts, id, obj, raw, **kw): pass def update(self, ns, ts, id, mod, raw, **kw): pass def delete(self, ns, ts, id, raw, **kw): pass def command(self, ns, ts, cmd, raw, **kw): pass def db_declare(self, ns, ts, **kw): pass class OplogPrinter(OplogWatcher): def all(self, **kw): pprint (kw) print #newline if name == 'main': OplogPrinter()
First, implement a database Initialization, set a delay time (quasi real-time):
self.poll_time = poll_time self.connection = connection or pymongo.MongoClient()
The main function is start()
, to achieve a time comparison and perform Processing of corresponding fields:
def start(self): oplog = self.connection.local['oplog.$main'] #读取之前提到的库 ts = oplog.find().sort('$natural', -1)[0]['ts'] #获取一个时间边际 while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter) #对此时间之后的进行处理 while True: for op in cursor: ts = op['ts'] id = self.get_id(op) self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) #可以指定处理插入监控,更新监控或者删除监控等 time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time)
Loop this start function, and write the corresponding monitoring and processing logic here in all_with_noop.
In this way, a simple quasi-real-time Mongodatabase operationmonitor can be implemented. In the next step, the newly entered program can be processed accordingly in conjunction with other operations.
The above is the detailed content of Share an example of data monitoring using the oplog mechanism in MongoDB. For more information, please follow other related articles on the PHP Chinese website!