SQL работи с и връща таблични данни (или релации, ако предпочитате да мислите за това по този начин, но не всички SQL таблици са релации). Това означава, че вложена таблица като изобразената във въпроса не е толкова често срещана характеристика. Има начини да се създаде нещо подобно в Postgresql, например с помощта на масиви от JSON или композити, но е напълно възможно просто да извлечете таблични данни и да извършите влагането в приложението. Python има itertools.groupby()
, което се вписва доста добре, предвид сортирани данни.
Грешката column "incoming.id" must appear in the GROUP BY clause...
казва, че неагрегираните елементи в списъка за избор, клауза за наличие и т.н. трябва да се показват в GROUP BY
клауза или да се използват в агрегат, за да не имат евентуално неопределени стойности . С други думи, стойността трябва да бъде избрана само от някой ред в групата, защото GROUP BY
кондензира групираните редове в един ред , и всеки може да познае от кой ред са избрани. Реализацията може да позволи това, както SQLite прави и MySQL, но SQL стандартът забранява това. Изключението от правилото е, когато има функционална зависимост
; GROUP BY
клауза определя неагрегираните. Помислете за съединение между таблици A и B групирани по A първичен ключ. Без значение кой ред в група, системата ще избере стойностите за A колони от, те ще бъдат еднакви, тъй като групирането е извършено въз основа на първичния ключ.
За да се отговори на общия предвиден подход от 3 точки, един от начините би бил да се избере обединение на входящи и изходящи, подредени по техните времеви марки. Тъй като няма йерархия на наследяване настройка––тъй като може дори да няма такъв, не съм запознат със счетоводството––връщането към използване на Core и обикновени кортежи с резултати прави нещата по-лесни в този случай:
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)
След това, за да се формира вложената структура itertools.groupby()
се използва:
date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Крайният резултат е списък от 2 кортежа с дата и списък с речници на записи във възходящ ред. Не е съвсем ORM решение, но върши работата. Пример:
In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [57]: session.commit()
In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
...: where(Incoming.accountID == 1)
...:
...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
...: where(Outgoing.accountID == 1)
...:
...: all_entries = incoming.union(outgoing)
...: all_entries = all_entries.order_by(all_entries.c.timestamp)
...: all_entries = db_session.execute(all_entries)
In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]:
[(datetime.date(2019, 9, 1),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 5,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 2),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 3),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 2,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
'type': 'outgoing'}])]
Както споменахме, Postgresql може да произведе почти същия резултат като използването на масив от JSON:
from sqlalchemy.dialects.postgresql import aggregate_order_by
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing).alias('all_entries')
day = func.date_trunc('day', all_entries.c.timestamp)
stmt = select([day,
func.array_agg(aggregate_order_by(
func.row_to_json(literal_column('all_entries.*')),
all_entries.c.timestamp))]).\
group_by(day).\
order_by(day)
db_session.execute(stmt).fetchall()
Ако всъщност Incoming
и Outgoing
могат да се разглеждат като деца на обща база, например Entry
, използването на обединения може да бъде донякъде автоматизирано с наследяване на конкретна таблица
:
from sqlalchemy.ext.declarative import AbstractConcreteBase
class Entry(AbstractConcreteBase, Base):
pass
class Incoming(Entry):
__tablename__ = 'incoming'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="incomings")
__mapper_args__ = {
'polymorphic_identity': 'incoming',
'concrete': True
}
class Outgoing(Entry):
__tablename__ = 'outgoing'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="outgoings")
__mapper_args__ = {
'polymorphic_identity': 'outgoing',
'concrete': True
}
За съжаление използвайки AbstractConcreteBase
изисква ръчно извикване на configure_mappers()
когато са дефинирани всички необходими класове; в този случай най-ранната възможност е след дефинирането на User
, защото Account
зависи от него чрез връзки:
from sqlalchemy.orm import configure_mappers
configure_mappers()
След това, за да извлечете всички Incoming
и Outgoing
в една полиморфна ORM заявка използвайте Entry
:
session.query(Entry).\
filter(Entry.accountID == accountID).\
order_by(Entry.timestamp).\
all()
и продължете да използвате itertools.groupby()
както по-горе в получения списък с Incoming
и Outgoing
.