应用MongoDB中oplog机制落到实处准实时数据的操作

时间:2019-11-26 15:58来源:澳洲幸运10产品
前言 日前有三个须求是要实时获取到新插入到MongoDB的多少,而插入程序本人已经有意气风发套管理逻辑,所以不便于直接在插入程序里写相关程序,守旧的数据库许多自带这种触发器

前言

日前有三个须求是要实时获取到新插入到MongoDB的多少,而插入程序本人已经有意气风发套管理逻辑,所以不便于直接在插入程序里写相关程序,守旧的数据库许多自带这种触发器机制,可是Mongo未有有关的函数可以用,当然还应该有点是索要python完成,于是搜聚收拾了一个一呼百诺的落真实情况势。

一、引子

第意气风发能够想到,这种需要实际上很像数据库的着力备份机制,从数据库之所以能够一起主库是因为存在有些目的来做决定,我们领悟MongoDB即使并未有现成触发器,不过它能够落实基本备份,所以大家就从它的宗旨备份机制出手。

二、OPLOG

首先,要求以master情势来张开mongod守护,命令行使用–master,可能配置文件扩大master键为true。

此刻,我们得以在Mongo的系统库local里看见新增加的collection——oplog,那时oplog.$main里就能够积攒进oplog新闻,假使此刻还也许有当作从数据库的Mongo存在,就会还或然有局地slaves的音讯,由于大家那边实际不是着力同步,所以空中楼阁此些集中。

再来看看oplog结构:

"ts" : Timestamp, 时间戳"h" : NumberLong, 长度"v" : 2, "op" : "n", 操作类型"ns" : "", 操作的库和集合"o2" : "_id" update条件"o" : {} 操作值,即document

此间需求精通op的三种属性:

insert,'i'update, 'u'remove, 'd'cmd, 'c'noop, 'n' 空操作

从地方的音信方可阅览,我们假使不断读取到ts来做相比较,然后依照op就可以推断当前边世的是怎么样操作,也就是接纳程序完毕了一个从数据库的选拔端。

三、CODE

在Github上找到了人家的落实方式,不过它的函数库太老旧,所以在她的底蕴上进展退换。

Github地址:

mongo_oplog_watcher.py如下:

#!/usr/bin/pythonimport pymongoimport reimport timefrom pprint import pprint # pretty printerfrom pymongo.errors import AutoReconnectclass OplogWatcher: def __init__(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True): if collection is not None: if db is None: raise ValueError('must specify db if you specify a collection') self._ns_filter = db + '.' + collection elif db is not None: self._ns_filter = re.compile else: self._ns_filter = None self.poll_time = poll_time self.connection = connection or pymongo.Connection() if start_now: self.start() @staticmethod def __get_id: id = None o2 = op.get if o2 is not None: id = o2.get if id is None: id = op['o'].get return id def start: oplog = self.connection.local['oplog.$main'] ts = oplog.find().sort[0]['ts'] while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter, tailable=True) while True: for op in cursor: ts = op['ts'] id = self.__get_id self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) time.sleep if not cursor.alive: break except AutoReconnect: time.sleep def all_with_noop(self, ns, ts, op, id, raw): if op == 'n': self.noop else: self.all(ns=ns, ts=ts, op=op, id=id, raw=raw) def all(self, ns, ts, op, id, raw): if op == 'i': self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw) elif op == 'u': self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw) elif op == 'd': self.delete(ns=ns, ts=ts, id=id, raw=raw) elif op == 'c': self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw) elif op == 'db': self.db_declare(ns=ns, ts=ts, raw=raw) def noop: pass def insert(self, ns, ts, id, obj, raw, **kw): pass def update(self, ns, ts, id, mod, raw, **kw): pass def delete(self, ns, ts, id, raw, **kw): pass def command(self, ns, ts, cmd, raw, **kw): pass def db_declare: passclass OplogPrinter: def all: pprint  print #newlineif __name__ == '__main__': OplogPrinter()

先是是促成三个数据库的初阶化,设定贰个延迟时间:

self.poll_time = poll_timeself.connection = connection or pymongo.MongoClient()

关键的函数是start(),完成一个时日的比对并开展对应字段的管理:

def start: oplog = self.connection.local['oplog.$main'] #读取之前提到的库 ts = oplog.find().sort[0]['ts'] #获取一个时间边际 while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find #对此时间之后的进行处理 while True: for op in cursor: ts = op['ts'] id = self.__get_id self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) #可以指定处理插入监控,更新监控或者删除监控等 time.sleep if not cursor.alive: break except AutoReconnect: time.sleep

巡回那几个start函数,在all_with_noop这里就可以编写相应的监察处理逻辑。

那样就能够完成多少个简便的准实时Mongo数据库操作监察和控制器,下一步就足以合作别的操作来对新入库的程序实行对应管理。

总结

如上正是这篇小说的整体内容了,希望本文的从头到尾的经过对我们的上学只怕干活能推动一定的提携,假诺有疑难我们能够留言交换,多谢大家对剧本之家的支撑。

编辑:澳洲幸运10产品 本文来源:应用MongoDB中oplog机制落到实处准实时数据的操作

关键词: