db.collection.aggregate(
[
{
"$addFields": {
"indexes": {
"$range": [
0,
{
"$size": "$time_series"
}
]
},
"reversedSeries": {
"$reverseArray": "$time_series"
}
}
},
{
"$project": {
"derivatives": {
"$reverseArray": {
"$slice": [
{
"$map": {
"input": {
"$zip": {
"inputs": [
"$reversedSeries",
"$indexes"
]
}
},
"in": {
"$subtract": [
{
"$arrayElemAt": [
"$$this",
0
]
},
{
"$arrayElemAt": [
"$reversedSeries",
{
"$add": [
{
"$arrayElemAt": [
"$$this",
1
]
},
1
]
}
]
}
]
}
}
},
{
"$subtract": [
{
"$size": "$time_series"
},
1
]
}
]
}
},
"time_series": 1
}
}
]
)
Можем да използваме конвейера по-горе във версия 3.4+, за да направим това. В конвейера използваме $addFields
етап на тръбопровод. оператор за добавяне на масива от индекса на елементите на "time_series" за документ, ние също обърнахме масива от времеви серии и го добавихме към документа, използвайки съответно $range
и $reverseArray
оператори
Обърнахме масива тук, защото елементът на позиция p
в масива винаги е по-голямо от елемента на позиция p+1
което означава, че [p] - [p+1] <0
и не искаме да използваме $multiply
тук (вижте тръбопровода за версия 3.2)
След това $zipped
данните от времевия ред с масива от индекси и приложи изважданекод>
израз към резултантния масив с помощта на $map
оператор.
След това $slice
резултатът да отхвърли null/None
стойност от масива и обърна отново резултата.
В 3.2 можем да използваме $unwind
оператор за развиване нашия масив и включете индекса на всеки елемент в масива, като посочите документ като операнд вместо традиционния „път“ с префикс $ .
След това в процеса трябва да $group
нашите документи и използвайте $push
акумулаторен оператор за връщане на масив от поддокументи, които изглеждат така:
{
"_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
"time_series" : [
{ "value" : 10, "index" : NumberLong(0) },
{ "value" : 20, "index" : NumberLong(1) },
{ "value" : 40, "index" : NumberLong(2) },
{ "value" : 70, "index" : NumberLong(3) },
{ "value" : 110, "index" : NumberLong(4) }
]
}
Накрая идва $project
сцена. На този етап трябва да използваме $mapкод>
оператор за прилагане на поредица от изрази към всеки елемент в новоизчисления масив в $group
етап.
Ето какво се случва вътре в $map
(вижте $map
като for цикъл)in израз:
За всеки поддокумент ние присвояваме стойността поле към променлива с помощта на $let
променлив оператор. След това изваждаме неговата стойност от стойността на полето „стойност“ на следващия елемент в масива.
Тъй като следващият елемент в масива е елементът с текущия индекс плюс едно, всичко, от което се нуждаем, е помощта на $arrayElemAt
оператор и прост $add
ция на индекса на текущия елемент и 1
.
$subtract
изразът връща отрицателна стойност, така че трябва да умножим стойността по -1
използвайки $multiply
оператор.
Също така трябва да $filter
полученият масив, защото последният елемент е Няма
или null
. Причината е, че когато текущият елемент е последният елемент, $subtract
върне Няма
тъй като индексът на следващия елемент е равен на размера на масива.
db.collection.aggregate([
{
"$unwind": {
"path": "$time_series",
"includeArrayIndex": "index"
}
},
{
"$group": {
"_id": "$_id",
"time_series": {
"$push": {
"value": "$time_series",
"index": "$index"
}
}
}
},
{
"$project": {
"time_series": {
"$filter": {
"input": {
"$map": {
"input": "$time_series",
"as": "el",
"in": {
"$multiply": [
{
"$subtract": [
"$$el.value",
{
"$let": {
"vars": {
"nextElement": {
"$arrayElemAt": [
"$time_series",
{
"$add": [
"$$el.index",
1
]
}
]
}
},
"in": "$$nextElement.value"
}
}
]
},
-1
]
}
}
},
"as": "item",
"cond": {
"$gte": [
"$$item",
0
]
}
}
}
}
}
])
Друг вариант, който според мен е по-малко ефективен, е извършване на операция map/reduce върху нашата колекция с помощта на map_reduce
метод.
>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
... function() {
... var derivatives = [];
... for (var index=1; index<this.time_series.length; index++) {
... derivatives.push(this.time_series[index] - this.time_series[index-1]);
... }
... emit(this._id, derivatives);
... }
... """)
>>> reducer = Code("""
... function(key, value) {}
... """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
... print(res) # or do something with the document.
...
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}
Можете също така да извлечете целия документ и да използвате numpy.diff
за да върне производната по този начин:
import numpy as np
for document in collection.find({}, {'time_series': 1}):
result = np.diff(document['time_series'])