Добре дошли в стрийминг. Това, което наистина искате, е „поток със събития“, който обработва входа ви „един по един“ и, разбира се, в идеалния случай чрез общ разделител, като например знака „нов ред“, който използвате в момента.
За наистина ефективни неща можете да добавите използване на MongoDB "Bulk API" вмъква, за да направи зареждането ви възможно най-бързо, без да изяжда цялата памет на машината или цикли на процесора.
Не се застъпвам, тъй като има различни налични решения, но ето списък, който използва line- пакет за входен поток за да направите частта "завършващ ред" проста.
Дефиниции на схеми само чрез „пример“:
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
async = require("async"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("\n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
stream.pause();
bulk.execute(function(err,result) {
if (err) callback(err);
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
stream.on("end",function() {
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
});
});
Така че обикновено интерфейсът "поток" там "разбива входа", за да обработва "ред по ред". Това ви спира да зареждате всичко наведнъж.
Основните части са "API за групови операции" от MongoDB. Това ви позволява да "наредите" много операции наведнъж, преди действително да изпратите до сървъра. Така че в този случай с използването на "modulo", записи се изпращат само на 1000 обработени записа. Наистина можете да правите всичко до лимита от 16 MB BSON, но го поддържайте управляем.
В допълнение към операциите, които се обработват групово, има допълнителен „ограничител“ от async библиотека. Наистина не е задължително, но това гарантира, че по същество не повече от „ограничението по модул“ документи се обработват по всяко време. Общите партидни „вмъквания“ не са без разходи за IO, освен памет, но извикванията „изпълнение“ означават, че IO се обработва. Така че чакаме, вместо да редим повече неща.
Със сигурност има по-добри решения, които можете да намерите за „поточно обработване“ на данни от тип CSV, което изглежда е това. Но като цяло това ви дава концепциите как да направите това по ефективен за паметта начин, без да изяждате циклите на процесора.