Mysql
 sql >> база данни >  >> RDS >> Mysql

Използване на Python и MySQL в ETL процеса:Използване на Python и SQLAlchemy

В предишните две статии от тази серия обсъдихме как да използваме Python и SQLAlchemy за изпълнение на ETL процеса. Днес ще направим същото, но този път използвайки Python и SQL Alchemy без SQL команди в текстов формат. Това ще ни позволи да използваме SQLAlchemy независимо от двигателя на базата данни, към който сме свързани. И така, да започнем.

Днес ще обсъдим как да изпълним ETL процеса с помощта на Python и SQLAlchemy. Ще създадем скрипт за извличане на ежедневни данни от нашата оперативна база данни, ще ги трансформираме и след това ще заредим в нашия склад за данни.

Това е третата статия от поредицата. Ако не сте чели първите две статии (Използване на Python и MySQL в ETL процеса и SQLAlchemy), силно ви препоръчвам да го направите, преди да продължите.

Цялата тази серия е продължение на нашата серия от хранилища за данни:

  • Създаване на DWH, част първа:модел на бизнес данни за абонамент
  • Създаване на DWH, част втора:модел на бизнес данни за абонамент
  • Създаване на хранилище за данни, част 3:Модел на бизнес данни за абонамент

Добре, сега да започнем с днешната тема. Първо, нека разгледаме моделите на данни.

Моделите на данни



Модел на данни от оперативна (на живо) база данни




DWH модел на данни


Това са двата модела на данни, които ще използваме. За повече информация относно складовете за данни (DWHs), вижте тези статии:

  • Звездната схема
  • Схема на снежинката
  • Звездна схема срещу схема със снежинка

Защо SQLAlchemy?

Цялата идея зад SQLAlchemy е, че след като импортираме бази данни, не се нуждаем от SQL код, който е специфичен за свързаната база данни. Вместо това можем да импортираме обекти в SQLAlchemy и да използваме синтаксиса на SQLAlchemy за изрази. Това ще ни позволи да използваме един и същ език, независимо към коя база данни сме свързани. Основното предимство тук е, че разработчикът не трябва да се грижи за разликите между различните машини за бази данни. Вашата програма SQLAlchemy ще работи точно по същия начин (с незначителни промени), ако мигрирате към друга база данни.

Реших да използвам само команди на SQLAlchemy и списъци на Python за комуникация с временно хранилище и между различни бази данни. Основните причини за това решение са, че 1) списъците на Python са добре познати и 2) кодът ще бъде четим за тези без умения за Python.

Това не означава, че SQLAlchemy е перфектен. Той има определени ограничения, които ще обсъдим по-късно. Засега нека просто да разгледаме кода по-долу:

Изпълнение на скрипта и резултата

Това е командата на Python, използвана за извикване на нашия скрипт. Скриптът проверява данните в оперативната база данни, сравнява стойностите с DWH и импортира новите стойности. В този пример актуализираме стойности в таблици с две измерения и една таблица с факти; скриптът връща подходящия изход. Целият скрипт е написан така, че да можете да го изпълнявате няколко пъти на ден. Той ще изтрие „старите“ данни за този ден и ще ги замени с нови.

Нека анализираме целия скрипт, като започнем отгоре.

Импортиране на SQLAlchemy

Първото нещо, което трябва да направим, е да импортираме модулите, които ще използваме в скрипта. Обикновено ще импортирате модулите си, докато пишете скрипта. В повечето случаи няма да знаете точно кои модули ще ви трябват в самото начало.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

Импортирахме datetime на Python модул, който ни снабдява с класове, които работят с дати.

След това имаме sqlalchemy модул. Няма да импортираме целия модул, а само нещата, от които се нуждаем – някои специфични за SQLAlchemy (create_engine , MetaData , Table ), някои части на SQL изрази (select , and_ , case ) и func , което ни позволява да използваме функции като count() и sum() .

Свързване с базите данни

Ще трябва да се свържем с две бази данни на нашия сървър. Можем да се свържем с повече бази данни (MySQL, SQL Server или всякакви други) от различни сървъри, ако е необходимо. В този случай и двете бази данни са MySQL бази данни и се съхраняват на моята локална машина.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

Създадохме два механизма и две връзки. Тук няма да навлизам в подробности, защото вече сме обяснили това в предишната статия.

Актуализиране на dim_time Размер

Цел:Вмъкнете вчерашната дата, ако още не е вмъкната в таблицата.

В нашия скрипт ще актуализираме таблици с две измерения с нови стойности. Останалите следват същия модел, така че ще разгледаме това само веднъж; не е нужно да записваме почти идентичен код още няколко пъти.

Идеята е много проста. Винаги ще изпълняваме скрипта, за да вмъкнем нови данни за вчера. Следователно, трябва да проверим дали тази дата е била вмъкната в таблицата с измерения. Ако вече е там, няма да правим нищо; ако не е, ще го добавим. Нека да разгледаме кода, за да актуализираме dim_time таблица.

Първо, ще проверим дали датата съществува. Ако не съществува, ще го добавим. Започваме със съхраняване на вчерашната дата в променлива. В Python го правите по следния начин:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

Първият ред взема текуща дата, преобразува я в числова стойност, изважда 1 от тази стойност и преобразува тази числова стойност обратно в дата (вчера =днес – 1 ). Вторият ред съхранява датата в текстов формат.

След това ще тестваме дали датата вече е в базата данни:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

След като заредим таблицата, ще изпълним заявка, която трябва да върне всички редове от таблицата с измерения, където стойността на час/дата е равна на вчера. Резултатът може да има 0 (няма такава дата в таблицата) или 1 ред (датата вече е в таблицата).

Ако датата все още не е в таблицата, ще използваме командата insert(), за да я добавим:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Едно ново нещо тук, което бих искал да посоча, е използването на. .year , .month , .isocalendar()[1] и .weekday за да получите части от дати.

Актуализиране на dim_city Размер

Цел:Вмъкнете нови градове, ако има такива (т.е. сравнете списъка с градове в базата данни на живо със списъка с градове в DWH и добавете липсващи).

Актуализиране на dim_time измерението беше доста просто. Просто тествахме дали дадена дата е в таблицата и я вмъкнахме, ако вече не е там. За да тестваме стойност в базата данни DWH, използвахме променлива на Python (вчера ). Ще използваме този процес отново, но този път със списъци.

Тъй като няма лесен начин за комбиниране на таблици от различни бази данни в една SQLAlchemy заявка, не можем да използваме подхода, описан в Част 1 от тази серия. Следователно ще ни е необходим обект, който да съхранява стойностите, необходими за комуникация между тези две бази данни. Реших да използвам списъци, защото те са често срещани и вършат работата.

Първо, ще заредим country и city таблици от жива база данни в съответните обекти.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

След това ще заредим dim_city таблица от DWH в списък:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

След това ще направим същото за стойностите от живата база данни. Ще се присъединим към таблиците country и city така че имаме всички необходими данни в този списък:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Сега ще преминем през списъка, съдържащ данните от живата база данни. За всеки запис ще сравняваме стойности (city_name , postal_code и country_name ). Ако не намерим такива стойности, ще добавим нов запис в dim_city таблица.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

За да определим дали стойността вече е в DWH, тествахме комбинация от атрибути, които трябва да бъдат уникални. (Първичният ключ от живата база данни не ни помага много тук.) Можем да използваме подобен код, за да актуализираме други речници. Това не е най-хубавото решение, но все пак е доста елегантно. И ще направи точно това, от което се нуждаем.

Актуализиране на fact_customer_subscribed Таблица

Цел:Ако имаме стари данни за вчерашната дата, първо ги изтрийте. Добавете вчерашните данни в DWH – независимо дали сме изтрили нещо в предишната стъпка или не.

След като актуализираме всички таблици с измерения, трябва да актуализираме таблиците с факти. В нашия скрипт ще актуализираме само една таблица с факти. Разсъжденията са същите като в предишния раздел:актуализирането на други таблици ще следва същия модел, така че най-вече ще повторим кода.

Преди да вмъкнем стойности в таблицата с факти, трябва да знаем стойностите на свързаните ключове от таблиците с измерения. За да направим това, отново ще заредим измерения в списъци и ще ги сравним със стойности от живата база данни.

Първото нещо, което ще направим, е да заредим клиента и fact_customer_subscribed таблици в обекти:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Сега ще трябва да намерим ключове за свързаното времево измерение. Тъй като винаги вмъкваме данни за вчера, ще търсим тази дата в dim_time таблица и използвайте нейния идентификатор. Заявката връща 1 ред и идентификаторът е на първа позиция (индексът започва от 0, така че това е result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

За това време ще изтрием всички свързани записи от таблицата с факти:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Добре, сега имаме идентификационния номер на измерението за време, съхранен в dim_time_id променлива. Това беше лесно, защото можем да имаме само една стойност на времевото измерение. Историята ще бъде различна за градското измерение. Първо, ще заредим всички стойностите, от които се нуждаем – стойности, които описват уникално града (не идентификационния номер), и обобщени стойности:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Има няколко неща, които бих искал да подчертая относно заявката по-горе:

  • func.sum(...) е SUM(...) от „стандартен SQL“.
  • case(...) синтаксисът използва and_ преди условия, а не между тях.
  • .label(...) функционира като псевдоним на SQL AS.
  • Използваме \ за да преминете към следващия ред и да увеличите четливостта на заявката. (Повярвайте ми, доста е нечетливо без наклонената черта - пробвах го :) )
  • .group_by(...) играе ролята на GROUP BY на SQL.

След това ще прегледаме всеки запис, върнат с помощта на предишната заявка. За всеки запис ще сравняваме стойности, които уникално дефинират град (city_name , postal_code , country_name ) със стойностите, съхранени в списъка, създаден от DWH dim_city маса. Ако и трите стойности съвпадат, ще съхраним идентификатора от списъка и ще го използваме при вмъкване на нови данни. По този начин за всеки запис ще имаме идентификатори за двете измерения:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

И това е всичко. Актуализирахме нашия DWH. Сценарият ще бъде много по-дълъг, ако актуализираме всички таблици с измерения и факти. Сложността също би била по-голяма, когато таблица с факти е свързана с повече таблици с измерения. В такъв случай ще ни трябва за цикъл за всяка таблица с размери.

Това не работи!

Бях много разочарован, когато написах този скрипт и след това разбрах, че нещо подобно няма да работи:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

В този пример се опитвам да използвам таблици от две различни бази данни. Ако установим две отделни връзки, първата връзка няма да „вижда“ таблици от друга връзка. Ако се свържем директно със сървъра, а не с база данни, няма да можем да заредим таблици.

Докато това се промени (надявам се скоро), ще трябва да използвате някаква структура (например това, което направихме днес), за да комуникирате между двете бази данни. Това усложнява кода, защото трябва да замените една заявка с два списъка и вложени за цикли.

Споделете вашите мисли за SQLAlchemy и Python

Това беше последната статия от тази поредица. Но кой знае? Може би ще опитаме друг подход в предстоящите статии, така че бъдете на линия. Междувременно, моля, споделете вашите мисли за SQLAlchemy и Python в комбинация с бази данни. Какво мислите, че ни липсва в тази статия? Какво бихте добавили? Кажете ни в коментарите по-долу.

Можете да изтеглите пълния скрипт, който използвахме в тази статия тук.

И специални благодарности към Dirk J Bosman (@dirkjobosman), който препоръча тази серия от статии.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. MySQL връща само един ред

  2. Как да съхранявате снимки в MySQL база данни

  3. Как да автоматизирате заявките за обобщена таблица в MySQL

  4. Зареждане на .sql файлове от PHP

  5. Какво връща успешното MySQL DELETE? Как да проверя дали DELETE е успешно?