В предишните две части представихме модела на база данни на живо за базиран на абонамент бизнес и склад за данни (DWH), който бихме могли да използваме за отчитане. Въпреки че е очевидно, че те трябва да работят заедно, нямаше връзка между тези два модела. Днес ще направим следващата стъпка и ще напишем кода за прехвърляне на данни от живата база данни в нашия DWH.
Моделите на данни
Преди да се потопим в кода, нека си припомним двата модела, с които ще работим. Първо е моделът на транзакционните данни, който ще използваме, за да съхраняваме данните си в реално време. Като се има предвид, че управляваме бизнес, базиран на абонаменти, ще трябва да съхраняваме подробности за клиентите и абонамента, поръчките на клиентите и статусите на поръчките.
Наистина можем да добавим много към този модел, като проследяване на плащания и съхраняване на исторически данни (особено промени в данните за клиенти и абонаменти). За да подчертая ETL процеса (извличане, трансформиране и зареждане) обаче, искам да запазя този модел възможно най-прост.
Използването на модел на транзакционни данни като база данни за отчитане може да работи в някои случаи, но няма да работи за всички случаи. Вече споменахме това, но си струва да го повторим. Ако искаме да отделим задачите си за отчитане от нашите процеси в реално време, трябва да създадем някакъв вид база данни за отчети. Склад за данни е едно решение.
Нашият DWH е съсредоточен около четири таблици с факти. Първите две проследяват броя на клиентите и абонаментите на дневно ниво. Останалите две проследяват броя на доставките и продуктите, включени в тези доставки.
Моето предположение е, че ще изпълняваме нашия ETL процес веднъж на ден. Първо, ще попълним таблици с измерения с нови стойности (където е необходимо). След това ще попълним таблици с факти.
За да избегна ненужни повторения, ще демонстрирам само кода, който ще попълни първите две таблици с измерения и първите две таблици с факти. Останалите таблици могат да бъдат попълнени с помощта на много подобен код. Насърчавам ви сами да запишете кода. Няма по-добър начин да научите нещо ново от това да го опитате.
Идеята:Таблици с размери
Общата идея е да създадем съхранени процедури, които можем редовно да използваме за попълване на DWH - таблици с измерения, както и таблици с факти. Тези процедури ще прехвърлят данни между две бази данни на един и същ сървър. Това означава, че някои заявки в тези процедури ще използват таблици от двете бази данни. Това се очаква; трябва да сравним състоянието на DWH с живата DB и да направим промени в DWH според това, което се случва в живата DB.
В нашия DWH имаме четири таблици с измерения:dim_time
, dim_city
, dim_product
и dim_delivery_status
.
Измерението за време се попълва чрез добавяне на предишната дата. Основното предположение е, че ще изпълняваме тази процедура ежедневно, след приключване на работата.
Градът и размерите на продукта ще зависят от текущите стойности, съхранени в city
и product
речници в живата база данни. Ако добавим нещо към тези речници, тогава нови стойности ще бъдат добавени към таблиците с измерения при следващата актуализация на DWH.
Последната таблица с измерения е dim_delivery_status
маса. Няма да се актуализира, защото съдържа само три стойности по подразбиране. Една доставка е в процес на транспортиране, анулирана или доставена.
Идеята:таблици с факти
Попълването на таблици с факти всъщност е истинската работа. Докато речниците в живата база данни не съдържат атрибут за времеви отпечатък, таблиците с данни, вмъкнати в резултат на нашите операции, го правят. Ще забележите два атрибута на времеви отпечатък, time_inserted
и time_updated
, в модела на данните.
Отново предполагам, че успешно ще стартираме импортирането на DWH веднъж на ден. Това ни позволява да обобщаваме данните на ежедневно ниво. Ще преброим броя на активните и анулирани клиенти и абонаменти, както и доставките и доставените продукти за тази дата.
Нашият модел на живо работи добре, ако стартираме процедура за вмъкване след COB (приключване на бизнеса). Все пак, ако искаме повече гъвкавост, трябва да направим някои промени в модела. Една такава промяна може да бъде наличието на отделна таблица с история за проследяване на точния момент, когато са се променили някакви данни, свързани с клиенти или абонаменти. С настоящата ни организация ще знаем, че промяната се е случила, но няма да знаем дали е имало промени преди тази (например клиент, анулиран вчера, активирал отново акаунта си след полунощ и след това анулиран отново днес) .
Попълване на таблици с размери
Както споменахме по-горе, ще приема предположението, че ще стартираме импортирането на DWH точно веднъж на ден. Ако това не е така, ще ни трябва допълнителен код, за да изтрием нововмъкнатите данни от таблиците с измерения и факти. За таблиците с измерения това ще бъде ограничено до изтриване на дадена дата.
Първо, ще проверим дали дадената дата съществува в dim_time
маса. Ако не, ще добавим нов ред към таблицата; ако е така, не е нужно да правим нищо. В повечето случаи всички дати се вмъкват по време на първоначалното производствено внедряване. Но ще използвам този пример за образователни цели.
За dim_city
и dim_product
размери, ще добавя само всички нови стойности, които открия в city
и product
маси. Няма да правя никакви изтривания, защото всички по-рано вмъкнати стойности могат да бъдат посочени в някаква таблица с факти. Бихме могли да отидем с меко изтриване, напр. с „активен“ флаг, който можем да включваме и изключваме.
За последната таблица dim_delivery_status
, няма да направя нищо, защото винаги ще съдържа едни и същи три стойности.
Кодът по-долу създава процедура, която ще попълни таблиците с измерения dim_time
и dim_city
.
За измерението време ще добавя вчерашна дата. Предполагам, че ETL процесът започва веднага след полунощ. Ще проверя дали това измерение вече съществува и ако не, тогава ще добавя новата дата в таблицата.
За измерението на града ще използвам LEFT JOIN, за да обединя данни от базата данни на живо и базата данни DWH, за да определя кои редове липсват. След това ще добавя само всички липсващи данни към таблицата с измерения. Струва си да се спомене, че има няколко начина да проверите дали данните са променени. Този процес се нарича събиране на данни за промяна или CDC. Често срещан метод е проверка за актуализирани времеви печати или версии. Има няколко допълнителни начина, но те са извън обхвата на тази статия.
Нека сега да разгледаме кода, който е написан с MySQL синтаксис .
DROP PROCEDURE IF EXISTS p_update_dimensions// CREATE PROCEDURE p_update_dimensions () BEGIN SET @time_exists = 0; SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates dimension tables with new values -- dim_time SET @time_exists = (SELECT COUNT(*) FROM subscription_dwh.dim_time dim_time WHERE dim_time.time_date = @time_date); IF (@time_exists = 0) THEN INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) SELECT @time_date AS time_date, YEAR(@time_date) AS time_year, MONTH(@time_date) AS time_month, WEEK(@time_date) AS time_week, WEEKDAY(@time_date) AS time_weekday, NOW() AS ts; END IF; -- dim_city INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now() FROM subscription_live.city city_live INNER JOIN subscription_live.country country_live ON city_live.country_id = country_live.id LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name WHERE city_dwh.id IS NULL; END// -- CALL p_update_dimensions ()
Изпълнение на тази процедура - която правим с помощта на коментирана процедура ОБЗАВЕТЕ -- вмъква нова дата и всички липсващи градове в таблиците с измерения. Опитайте да добавите свой собствен код, за да попълните останалите две таблици с измерения с нови стойности.
Процесът ETL в склад за данни
Основната идея зад съхранението на данни е да съдържа агрегирани данни в желания формат. Разбира се, трябва да знаем този формат, преди дори да започнем изграждането на склада. Ако сме направили всичко, както е планирано, можем да получим всички предимства, които ни предлага DWH. Основното предимство е подобрената производителност при изпълнение на заявки. Нашите заявки работят с по-малко записи (тъй като са агрегирани) и се изпълняват в базата данни за отчитане (а не в активната).
Но преди да можем да правим запитвания, трябва да съхраняваме факти в нашата база данни. Начинът, по който ще направим това, зависи от това какво трябва да направим с нашите данни по-късно. Ако нямаме добра цялостна картина, преди да започнем да изграждаме нашия DWH, скоро може да се окажем в беда! скоро.
Името на този процес е ETL:E =Извличане, T =Трансформиране, L =Зареждане. Той грабва данните, трансформира ги, за да отговарят на структурата на DWH, и ги зарежда в DWH. За да бъдем точни, действителният процес, който ще използваме, е ELT:Извличане, Зареждане, Трансформиране. Тъй като използваме съхранени процедури, ще извлечем данни, ще ги заредим и след това ще ги трансформираме, за да отговорим на нашите нужди. Добре е да знаете, че макар ETL и ELT да са малко по-различни, термините понякога се използват взаимозаменяемо.
Попълване на таблиците с факти
Попълването на таблици с факти е причината да сме тук. Днес ще попълня две таблици с факти, fact_customer_subscribed
таблица и fact_subscription_status
маса. Останалите две таблици с факти можете да опитате като домашна работа.
Преди да преминем към попълване на таблица с факти, трябва да приемем, че таблиците с измерения са попълнени с нови стойности. Попълването на таблиците с факти следва същия модел. Тъй като те имат една и съща структура, ще обясня и двете заедно.
Групираме данните по две измерения:време и град. Времевата величина ще бъде зададена на вчера и ще намерим идентификатора на свързания запис в dim_time
таблица чрез сравняване на дати (последното INNER JOIN в двете заявки).
Идентификационният номер на dim_city
се извлича чрез обединяване на всички атрибути, които образуват УНИКАЛНА комбинация в таблицата с измерения (име на град, пощенски код и име на държава).
В тази заявка ще тестваме стойности с CASE и след това ще ги SUM. За активни и неактивни клиенти не съм тествал датата. Избрах обаче стойности, каквито са за тези полета. За нови и закрити акаунти тествах актуализираното време.
DROP PROCEDURE IF EXISTS p_update_facts// CREATE PROCEDURE p_update_facts () BEGIN SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates fact tables with new values -- fact_customer_subscribed INSERT INTO `fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN customer_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN customer_live.active = 1 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN customer_live.active = 0 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; -- fact_subscription_status INSERT INTO `fact_subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN subscription_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN subscription_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN subscription_live.active = 1 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN subscription_live.active = 0 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id = customer_live.id INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; END// -- CALL p_update_facts ()
Отново коментирах последния ред. Премахнете коментара и можете да използвате този ред, за да извикате процедурата и да вмъкнете нови стойности. Моля, имайте предвид, че не съм изтрил никакви съществуващи стари стойности, така че тази процедура няма да работи, ако вече имаме стойности за тази дата и град. Това може да бъде решено чрез изтриване преди вмъкване.
Не забравяйте, че трябва да попълним останалите таблици с факти в нашия DWH. Насърчавам ви да опитате това сами!
Друго нещо, което определено бих препоръчал, е поставянето на целия процес в транзакция. Това ще гарантира, че или всички вмъквания са успешни, или нито едно не се прави. Това е много важно, когато искаме да избегнем частично вмъкване на данни, напр. ако имаме множество процедури за вмъкване на измерения и факти и някои от тях си вършат работата, докато други се провалят.
Какво мислите?
Днес видяхме как можем да изпълним ELT/ETL процеса и да заредим данни от жива база данни в хранилище за данни. Въпреки че процесът, който демонстрирахме, е доста опростен, той съдържа всички елементи, необходими за E(извличане) на данните, T(трансформиране) в подходящ формат и накрая L(зареждане) в DWH. Какво мислиш? Моля, кажете ни вашия опит в коментарите по-долу.