Наскоро MongoDB пусна нова функция, започваща от версия 3.6, Change Streams. Това ви дава незабавен достъп до вашите данни, което ви помага да сте в крак с промените в данните си. В днешния свят всеки иска незабавни известия, вместо да ги получава след няколко часа или минути. За някои приложения е изключително важно да изпращате известия в реално време до всички абонирани потребители за всяка актуализация. MongoDB направи този процес наистина лесен, като представи тази функция. В тази статия ще научим за потока за промени в MongoDB и неговите приложения с някои примери.
Дефиниране на потоци от промени
Потоците от промени не са нищо друго освен потокът в реално време на всякакви промени, които се случват в базата данни или колекцията или дори в разгръщанията. Например, когато се появи някаква актуализация (Вмъкване, Актуализиране или Изтриване) в конкретна колекция, MongoDB задейства събитие за промяна с всички данни, които са били променени.
Можете да дефинирате потоци от промени във всяка колекция, точно както всички други нормални оператори за агрегиране, като използвате оператор $changeStream и метод watch(). Можете също да дефинирате поток от промени, като използвате метода MongoCollection.watch().
Пример
db.myCollection.watch()
Промяна на функциите на потоците
-
Филтриране на промени
Можете да филтрирате промените, за да получавате известия за събития само за някои целеви данни.
Пример:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
Този код ще гарантира, че получавате актуализации само за записи, чието име е равно на Bob. По този начин можете да напишете всякакви конвейери за филтриране на потоците от промени.
-
Възобновяване на потоците от промени
Тази функция гарантира, че няма загуба на данни в случай на повреда. Всеки отговор в потока съдържа маркера за възобновяване, който може да се използва за рестартиране на потока от определена точка. При някои чести мрежови сривове драйверът на mongodb ще се опита да възстанови връзката с абонатите, използвайки най-новия токен за възобновяване. Въпреки че, в случай на пълен отказ на приложението, токенът за възобновяване трябва да се поддържа от клиентите, за да възобнови потока.
-
Подредени потоци от промени
MongoDB използва глобален логически часовник, за да подреди всички събития от потока от промени във всички реплики и фрагменти на всеки клъстер, така че получателят винаги ще получава известията в същия ред, в който командите са били приложени към базата данни.
-
Събития с пълни документи
MongoDB връща частта от съответстващите документи по подразбиране. Но можете да промените конфигурацията на потока за промяна, за да получите пълен документ. За да направите това, предайте { fullDocument:“updateLookup”} към метода за гледане.
Пример:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
Издръжливост
Потоците за промени ще уведомяват само за данните, които са ангажирани с по-голямата част от репликите. Това ще гарантира, че събитията се генерират от мнозинство постоянни данни, което гарантира трайността на съобщението.
-
Сигурност/контрол на достъпа
Потоците за промени са много сигурни. Потребителите могат да създават потоци от промени само в колекциите, за които имат разрешения за четене. Можете да създавате потоци за промени въз основа на ролите на потребителите.
Пример за потоци от промени
В този пример ще създадем потоци от промени в колекцията от акции, за да получаваме известия, когато цената на акциите надхвърли всеки праг.
-
Настройте клъстера
За да използваме потоци за промяна, първо трябва да създадем набор от реплики. Изпълнете следната команда, за да създадете набор от реплики с един възел.
mongod --dbpath ./data --replSet “rs”
-
Вмъкнете някои записи в колекцията от акции
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
Настройте средата на възел и инсталирайте зависимости
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
Абонирайте се за промените
Създайте един файл index.js и поставете следния код в него.
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
Сега стартирайте този файл:
node index.js
-
Вмъкнете нов запис в db, за да получите актуализация
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
Сега проверете конзолата си, ще получите актуализация от MongoDB.
Примерен отговор:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
Тук можете да промените стойността на параметъра operationType със следните операции, за да слушате за различни типове промени в колекция:
- Вмъкване
- Замяна (освен уникален идентификатор)
- Актуализиране
- Изтриване
- Невалидно (Винаги, когато Mongo върне невалиден курсор)
Други режими на потоци за промени
Можете да започнете да променяте потоци срещу база данни и да разгръщате по същия начин, както срещу колекция. Тази функция е пусната от MongoDB версия 4.0. Ето командите за отваряне на поток от промени спрямо база данни и разгръщания.
Against DB: db.watch()
Against deployment: Mongo.watch()
Заключение
MongoDB Change Streams опростява интеграцията между интерфейса и бекенда в реално време и безпроблемно. Тази функция може да ви помогне да използвате MongoDB за pubsub модел, така че вече не е необходимо да управлявате внедряванията на Kafka или RabbitMQ. Ако приложението ви изисква информация в реално време, тогава трябва да проверите тази функция на MongoDB. Надявам се тази публикация да ви помогне да започнете с потоците за промяна на MongoDB.