Можете да го направите с fast-csv, като получите headers
от дефиницията на схемата, която ще върне анализираните редове като "обекти". Всъщност имате някои несъответствия, така че ги маркирах с корекции:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Докато схемата действително съответства на предоставения CSV, тогава всичко е наред. Това са корекциите, които мога да видя, но ако имате нужда от действителните имена на полетата да са подравнени по различен начин, тогава трябва да коригирате. Но основно имаше Number
в позицията, където има String
и по същество допълнително поле, което предполагам, че е празното в CSV.
Общите неща са получаване на масива от имена на полета от схемата и предаването им в опциите, когато се прави csv екземпляр на анализатора:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
След като действително направите това, получавате обратно "Обект" вместо масив:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Не се притеснявайте за „типовете“, защото Mongoose ще прехвърля стойностите според схемата.
Останалото се случва в манипулатора за data
събитие. За максимална ефективност използваме insertMany()
да записвате в базата данни само веднъж на всеки 10 000 реда. Как това всъщност отива на сървъра и процесите зависи от версията на MongoDB, но 10 000 би трябвало да са доста разумни въз основа на средния брой полета, които бихте импортирали за една колекция по отношение на „компромиса“ за използване на паметта и писане на разумна заявка за мрежа. Направете числото по-малко, ако е необходимо.
Важните части са да маркирате тези извиквания като async
функции и await
резултатът от insertMany()
преди да продължите. Също така трябва да pause()
потока и resume()
за всеки елемент, в противен случай рискуваме да презапишем buffer
на документи, които да се вмъкнат, преди действително да бъдат изпратени. pause()
и resume()
са необходими за поставяне на "обратно налягане" върху тръбата, в противен случай елементите просто продължават да "излизат" и изстрелват data
събитие.
Естествено контролът за 10 000 записа изисква да проверяваме това както при всяка итерация, така и при завършване на потока, за да изпразним буфера и да изпратим всички останали документи до сървъра.
Това наистина искате да направите, тъй като със сигурност не искате да задействате асинхронна заявка към сървъра и при "всяка" итерация чрез data
събитие или по същество без да чака всяка заявка да завърши. Ще се разминете като не проверявате това за „много малки файлове“, но за всяко натоварване в реалния свят със сигурност ще надхвърлите стека от обаждания поради асинхронни повиквания „в полет“, които все още не са завършени.
FYI - package.json
използван. mz
е по избор, тъй като е просто модернизирано Promise
активирана библиотека от стандартни "вградени" библиотеки на възел, които просто съм свикнал да използвам. Разбира се, кодът е напълно взаимозаменяем с fs
модул.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Всъщност с Node v8.9.x и по-нови, тогава можем дори да направим това много по-лесно с реализация на AsyncIterator
чрез stream-to-iterator
модул. Все още е в Iterator<Promise<T>>
режим, но трябва да стане, докато Node v10.x стане стабилен LTS:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
По принцип цялата обработка на "събития" в потока, пауза и възобновяване се заменят с обикновен for
цикъл:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Лесно! Това се изчиства при по-късна реализация на възел с for..await..of
когато стане по-стабилен. Но горното работи добре на посочената версия и по-горе.