Map-reduce е може би най-гъвкавата от операциите за агрегиране, които MongoDB поддържа.
Map-Reduce е популярен модел за програмиране, който произхожда от Google за паралелна обработка и агрегиране на големи обеми данни. Подробно обсъждане на Map-Reduce е извън обхвата на тази статия, но по същество това е многоетапен процес на агрегиране. Най-важните две стъпки са етапът на картата (обработване на всеки документ и издаване на резултати) и етапът на намаляване (събиране на резултатите, излъчвани по време на етапа на картата).
MongoDB поддържа три вида операции за агрегиране:Map-Reduce, конвейер за агрегиране и единствени команди за агрегиране. Можете да използвате този документ за сравнение на MongoDB, за да видите кой отговаря на вашите нужди.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
В последната ми публикация видяхме, с примери, как да стартирате конвейери за агрегиране на вторични. В тази публикация ще преминем през изпълнение на задания Map-Reduce на вторичните реплики на MongoDB.
MongoDB Map-Reduce
MongoDB поддържа изпълнение на задания Map-Reduce на сървърите на базата данни. Това предлага гъвкавостта за писане на сложни задачи за агрегиране, които не се извършват толкова лесно чрез тръбопроводи за агрегиране. MongoDB ви позволява да пишете персонализирана карта и функции за намаляване в Javascript, които могат да бъдат предадени на базата данни чрез Mongo shell или всеки друг клиент. При големи и постоянно нарастващи набори от данни може дори да се помисли за стартиране на постепенни задачи за намаляване на картата, за да се избегне всеки път обработването на по-стари данни.
Исторически погледнато, картата и методите за намаляване, използвани са се изпълнявали в еднонишков контекст. Това ограничение обаче беше премахнато във версия 2.4.
Защо да изпълнявате задания Map-Reduce на вторичния?
Подобно на други задачи за агрегиране, Map-Reduce също е ресурсоемка „партидна“ работа, така че е подходяща за изпълнение на реплики само за четене. Предупрежденията при това са:
1) Би трябвало да е добре да използвате леко остарели данни. Или можете да настроите загрижеността за запис, за да гарантирате, че репликите винаги са в синхрон с основния. Тази втора опция предполага, че приемането на удар върху производителността на запис е приемливо.
2) Резултатът от заданието Map-Reduce не трябва да се записва в друга колекция в базата данни, а по-скоро да се връща в приложението (т.е. не се записва в базата данни).
Нека да разгледаме как да направите това чрез примери, както от mongo shell, така и от драйвера на Java.
Намаляване на карта на набори от реплики
Набор от данни
За илюстрация ще използваме доста прост набор от данни:ежедневен изпис на транзакции от търговец на дребно. Примерен запис изглежда така:
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
В нашите примери ще изчислим общите разходи на даден клиент за този ден. По този начин, като се има предвид нашата схема, методите за карта и редуциране ще изглеждат така:
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
С установената ни схема, нека разгледаме Map-Reduce в действие.
MongoDB Shell
За да се гарантира, че задачата Map-Reduce се изпълнява на вторичния елемент, предпочитанието за четене трябва да бъде настроено на вторично . Както казахме по-горе, за да може Map-Reduce да се изпълнява на вторичен, изходът на резултата трябва да бъде inline (Всъщност това е единствената изходяща стойност, разрешена за вторични). Нека видим как работи.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
Един поглед към регистрационните файлове на вторичния елемент потвърждава, че задачата наистина се изпълнява на вторичния.
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Java
Сега нека се опитаме да изпълним задание Map-Reduce върху прочетените реплики от Java приложение. В драйвера на MongoDB Java, настройката на предпочитание за четене върши работа. Резултатът е вграден по подразбиране, така че не е необходимо да се предават допълнителни параметри. Ето пример за използване на драйвер версия 3.2.2:
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
Както е видно от регистрационните файлове, задачата се изпълняваше на вторичния:
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDB Map-Reduce на разчленени клъстери
MongoDB поддържа Map-Reduce на разчленени клъстери, както когато разчленената колекция е входът, така и когато е изходът на задача Map-Reduce. Въпреки това, MongoDB понастоящем не поддържа изпълнение на задания за намаляване на картата на вторични елементи на разчленен клъстер. Така че дори ако опцията out е настроен на inline , Map-Reduce заданията винаги ще се изпълняват на първичните части на разчленен клъстер. Този проблем се проследява чрез тази грешка в JIRA.
Синтаксисът на изпълнение на задача Map-Reduce в раздробен клъстер е същият като този на набор от реплика. Така че примерите, предоставени в горния раздел, са валидни. Ако горният пример на Java се изпълнява на разчленен клъстер, на първичните се появяват регистрационни съобщения, показващи, че командата е изпълнена там.
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
Моля, посетете нашата продуктова страница на MongoDB, за да научите за нашия обширен списък с функции.