MongoDB
 sql >> база данни >  >> NoSQL >> MongoDB

Импортирайте CSV с помощта на схема Mongoose

Можете да го направите с fast-csv, като получите headers от дефиницията на схемата, която ще върне анализираните редове като "обекти". Всъщност имате някои несъответствия, така че ги маркирах с корекции:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Докато схемата действително съответства на предоставения CSV, тогава всичко е наред. Това са корекциите, които мога да видя, но ако имате нужда от действителните имена на полетата да са подравнени по различен начин, тогава трябва да коригирате. Но основно имаше Number в позицията, където има String и по същество допълнително поле, което предполагам, че е празното в CSV.

Общите неща са получаване на масива от имена на полета от схемата и предаването им в опциите, когато се прави csv екземпляр на анализатора:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

След като действително направите това, получавате обратно "Обект" вместо масив:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Не се притеснявайте за „типовете“, защото Mongoose ще прехвърля стойностите според схемата.

Останалото се случва в манипулатора за data събитие. За максимална ефективност използваме insertMany() да записвате в базата данни само веднъж на всеки 10 000 реда. Как това всъщност отива на сървъра и процесите зависи от версията на MongoDB, но 10 000 би трябвало да са доста разумни въз основа на средния брой полета, които бихте импортирали за една колекция по отношение на „компромиса“ за използване на паметта и писане на разумна заявка за мрежа. Направете числото по-малко, ако е необходимо.

Важните части са да маркирате тези извиквания като async функции и await резултатът от insertMany() преди да продължите. Също така трябва да pause() потока и resume() за всеки елемент, в противен случай рискуваме да презапишем buffer на документи, които да се вмъкнат, преди действително да бъдат изпратени. pause() и resume() са необходими за поставяне на "обратно налягане" върху тръбата, в противен случай елементите просто продължават да "излизат" и изстрелват data събитие.

Естествено контролът за 10 000 записа изисква да проверяваме това както при всяка итерация, така и при завършване на потока, за да изпразним буфера и да изпратим всички останали документи до сървъра.

Това наистина искате да направите, тъй като със сигурност не искате да задействате асинхронна заявка към сървъра и при "всяка" итерация чрез data събитие или по същество без да чака всяка заявка да завърши. Ще се разминете като не проверявате това за „много малки файлове“, но за всяко натоварване в реалния свят със сигурност ще надхвърлите стека от обаждания поради асинхронни повиквания „в полет“, които все още не са завършени.

FYI - package.json използван. mz е по избор, тъй като е просто модернизирано Promise активирана библиотека от стандартни "вградени" библиотеки на възел, които просто съм свикнал да използвам. Разбира се, кодът е напълно взаимозаменяем с fs модул.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Всъщност с Node v8.9.x и по-нови, тогава можем дори да направим това много по-лесно с реализация на AsyncIterator чрез stream-to-iterator модул. Все още е в Iterator<Promise<T>> режим, но трябва да стане, докато Node v10.x стане стабилен LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

По принцип цялата обработка на "събития" в потока, пауза и възобновяване се заменят с обикновен for цикъл:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Лесно! Това се изчиства при по-късна реализация на възел с for..await..of когато стане по-стабилен. Но горното работи добре на посочената версия и по-горе.



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Използване на S3 като база данни срещу база данни (напр. MongoDB)

  2. Как да предадете ObjectId от MongoDB в MVC.net

  3. Отидете:Създайте io.Writer интерфейс за регистриране в базата данни mongodb

  4. MongoDB проекция на вложени масиви

  5. MongoDB:Използване на съвпадение с променливи на входния документ