MongoDB
 sql >> база данни >  >> NoSQL >> MongoDB

Как може Python да наблюдава промените в Oplog на Mongodb

Написах инкрементален инструмент за архивиране за MongoDB преди известно време в Python. Инструментът следи промените в данните чрез проследяване на oplog . Ето съответната част от кода.

Актуализиран отговор, MongDB 3.6+

Както datdinhquoc умело посочва в коментарите по-долу, за MongoDB 3.6 и по-нови има Потоци за промяна .

Актуализиран отговор, pymongo 3

from time import sleep

from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect

# Time to wait for data or connection.
_SLEEP = 1.0

if __name__ == '__main__':
    oplog = MongoClient().local.oplog.rs
    stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']

    while True:
        kw = {}

        kw['filter'] = {'ts': {'$gt': stamp}}
        kw['cursor_type'] = CursorType.TAILABLE_AWAIT
        kw['oplog_replay'] = True

        cursor = oplog.find(**kw)

        try:
            while cursor.alive:
                for doc in cursor:
                    stamp = doc['ts']

                    print(doc)  # Do something with doc.

                sleep(_SLEEP)

        except AutoReconnect:
            sleep(_SLEEP)

Вижте също http://api.mongodb.com/python/current/examples /tailable.html .

Оригинален отговор, pymongo 2

from time import sleep

from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp

# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}

# Time to wait for data or connection.
_SLEEP = 10

if __name__ == '__main__':
    db = MongoClient().local

    while True:
        query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}}  # Replace with your query.
        cursor = db.oplog.rs.find(query, **_TAIL_OPTS)

        cursor.add_option(_QUERY_OPTIONS['oplog_replay'])

        try:
            while cursor.alive:
                try:
                    doc = next(cursor)

                    # Do something with doc.

                except (AutoReconnect, StopIteration):
                    sleep(_SLEEP)

        finally:
            cursor.close()


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. MongoDB агрегат в рамките на ежедневно групиране

  2. Обхватите на релсите могат да филтрират по броя на свързаните класове за дадено поле

  3. Сортиране на резултатите от заявката по реда на елементите в предоставения масив с условия в Mongoose

  4. Как да разрешите com.mongodb.spark.exceptions.MongoTypeConversionException:Не може да прехвърля... Java Spark

  5. Използване на методи на клас MongoEngine Document за персонализирано валидиране и предварително запазване на кукички