Успях да използвам Change Streams и TTL, за да емулирам cronjob. Публикувах публикация, обясняваща подробно какво съм направил, и дадох кредити на:https://www. patreon.com/posts/17697287
Но по принцип всеки път, когато трябва да планирам „събитие“ за документ, когато създавам документа, създавам паралелно и документ за събитие. Този документ за събитие ще има като свой _id същия идентификатор на първия документ.
Освен това за този документ за събитието ще задам TTL.
Когато TTL изтече, ще заснема неговата промяна за „изтриване“ с Change Streams. И след това ще използвам documentKey на промяната (тъй като е същият идентификатор като документа, който искам да задействам), за да намеря целевия документ в първата колекция и да правя всичко, което искам с документа.
Използвам Node.js с Express и Mongoose за достъп до MongoDB. Ето съответната част, която трябва да се добави в App.js:
const { ReplSet } = require('mongodb-topology-manager');
run().catch(error => console.error(error));
async function run() {
console.log(new Date(), 'start');
const bind_ip = 'localhost';
// Starts a 3-node replica set on ports 31000, 31001, 31002, replica set
// name is "rs0".
const replSet = new ReplSet('mongod', [
{ options: { port: 31000, dbpath: `${__dirname}/data/db/31000`, bind_ip } },
{ options: { port: 31001, dbpath: `${__dirname}/data/db/31001`, bind_ip } },
{ options: { port: 31002, dbpath: `${__dirname}/data/db/31002`, bind_ip } }
], { replSet: 'rs0' });
// Initialize the replica set
await replSet.purge();
await replSet.start();
console.log(new Date(), 'Replica set started...');
// Connect to the replica set
const uri = 'mongodb://localhost:31000,localhost:31001,localhost:31002/' + 'test?replicaSet=rs0';
await mongoose.connect(uri);
var db = mongoose.connection;
db.on('error', console.error.bind(console, 'connection error:'));
db.once('open', function () {
console.log("Connected correctly to server");
});
// To work around "MongoError: cannot open $changeStream for non-existent database: test" for this example
await mongoose.connection.createCollection('test');
// *** we will add our scheduler here *** //
var Item = require('./models/item');
var ItemExpiredEvent = require('./models/scheduledWithin');
let deleteOps = {
$match: {
operationType: "delete"
}
};
ItemExpiredEvent.watch([deleteOps]).
on('change', data => {
// *** treat the event here *** //
console.log(new Date(), data.documentKey);
Item.findById(data.documentKey, function(err, item) {
console.log(item);
});
});
// The TTL set in ItemExpiredEvent will trigger the change stream handler above
console.log(new Date(), 'Inserting item');
Item.create({foo:"foo", bar: "bar"}, function(err, cupom) {
ItemExpiredEvent.create({_id : item._id}, function(err, event) {
if (err) console.log("error: " + err);
console.log('event inserted');
});
});
}
И ето кода за модел/ScheduledWithin:
var mongoose = require('mongoose');
var Schema = mongoose.Schema;
var ScheduledWithin = new Schema({
_id: mongoose.Schema.Types.ObjectId,
}, {timestamps: true});
// timestamps: true will automatically create a "createdAt" Date field
ScheduledWithin.index({createdAt: 1}, {expireAfterSeconds: 90});
module.exports = mongoose.model('ScheduledWithin', ScheduledWithin);