Mysql
 sql >> база данни >  >> RDS >> Mysql

Използване на Python и MySQL в ETL процеса

Python е много популярен в наши дни. Тъй като Python е език за програмиране с общо предназначение, той може да се използва и за изпълнение на процеса на извличане, трансформиране, зареждане (ETL). Предлагат се различни ETL модули, но днес ще се придържаме към комбинацията от Python и MySQL. Ще използваме Python за извикване на съхранени процедури и за подготовка и изпълнение на SQL изрази.

Ще използваме два сходни, но различни подхода. Първо, ще извикаме съхранени процедури, които ще свършат цялата работа, а след това ще анализираме как бихме могли да извършим същия процес без съхранени процедури, като използваме MySQL код в Python.

Готов? Преди да се задълбочим, нека разгледаме модела на данни – или моделите на данни, тъй като в тази статия има два от тях.

Моделите на данни

Ще ни трябват два модела на данни, единият за съхраняване на нашите оперативни данни, а другият за съхраняване на нашите отчетни данни.




Първият модел е показан на снимката по-горе. Този модел се използва за съхраняване на оперативни (на живо) данни за бизнес, базиран на абонамент. За повече представа за този модел, моля, разгледайте предишната ни статия, Създаване на DWH, Част първа:Модел на бизнес данни за абонамент.




Разделянето на оперативните и отчетните данни обикновено е много мъдро решение. За да постигнем това разделяне, ще трябва да създадем склад за данни (DWH). Вече направихме това; можете да видите модела на снимката по-горе. Този модел също е описан подробно в публикацията Създаване на DWH, част втора:модел на бизнес данни за абонамент.

И накрая, трябва да извлечем данни от живата база данни, да ги трансформираме и да ги заредим в нашия DWH. Вече направихме това с помощта на SQL съхранени процедури. Можете да намерите описание на това, което искаме да постигнем, заедно с някои примери за код в Създаване на хранилище за данни, част 3:Абонаментен модел на бизнес данни.

Ако имате нужда от допълнителна информация относно DWHs, препоръчваме да прочетете тези статии:

  • Звездната схема
  • Схема на снежинката
  • Звездна схема срещу схема със снежинка.

Нашата задача днес е да заменим SQL съхранените процедури с код на Python. Готови сме да направим малко магия на Python. Нека започнем с използването само на съхранени процедури в Python.

Метод 1:ETL с помощта на съхранени процедури

Преди да започнем да описваме процеса, важно е да споменем, че имаме две бази данни на нашия сървър.

subscription_live базата данни се използва за съхраняване на транзакционни/активни данни, докато subscription_dwh е нашата база данни за отчети (DWH).

Вече описахме съхранените процедури, използвани за актуализиране на таблици с измерения и факти. Те ще четат данни от subscription_live база данни, комбинирайте я с данни в subscription_dwh база данни и вмъкнете нови данни в subscription_dwh база данни. Тези две процедури са:

  • p_update_dimensions – Актуализира таблиците с размери dim_time и dim_city .
  • p_update_facts – Актуализира две таблици с факти, fact_customer_subscribed и fact_subscription_status .

Ако искате да видите пълния код за тези процедури, прочетете Създаване на хранилище за данни, част 3:Модел на бизнес данни за абонамент.

Сега сме готови да напишем прост Python скрипт, който ще се свърже със сървъра и ще извърши ETL процеса. Нека първо да разгледаме целия скрипт (etl_procedures.py ). След това ще обясним най-важните части.

# import MySQL connectorimport mysql.connector# connect to serverconnection =mysql.connector.connect(user='', password='', host='127.0.0.1')print('Свързан към база данни.')cursor =connection.cursor()# Актуализирам dimensionscursor.callproc('subscription_dwh.p_update_dimensions')print('Таблиците с измерения са актуализирани.')# II актуализирам factscursor.callproc('subscription_dwh.p_update_facts')print('Facts')print таблиците са актуализирани.')# commit &close connectioncursor.close()connection.commit()connection.close()print('Прекъснато от базата данни.')

etl_procedures.py

Импортиране на модули и свързване към базата данни

Python използва модули за съхранение на дефиниции и изрази. Можете да използвате съществуващ модул или да напишете свой собствен. Използването на съществуващи модули ще опрости живота ви, защото използвате предварително написан код, но писането на свой собствен модул също е много полезно. Когато излезете от интерпретатора на Python и го стартирате отново, ще загубите функции и променливи, които сте дефинирали по-рано. Разбира се, не искате да въвеждате един и същ код отново и отново. За да избегнете това, можете да съхранявате вашите дефиниции в модул и да ги импортирате в Python.

Обратно към etl_procedures.py . В нашата програма започваме с импортиране на MySQL конектор:

# import MySQL connectorimport mysql.connector

MySQL Connector за Python се използва като стандартизиран драйвер, който се свързва към MySQL сървър/база данни. Ще трябва да го изтеглите и инсталирате, ако не сте го направили преди. Освен свързване към базата данни, той предлага редица методи и свойства за работа с база данни. Ще използваме някои от тях, но можете да проверите пълната документация тук.

След това ще трябва да се свържем с нашата база данни:

# свързване към сървърна връзка =mysql.connector.connect(user='<потребителско име>', парола='<парола>', хост='127.0.0.1')print('Свързан с базата данни.')курсор =връзка. .cursor()

Първият ред ще се свърже със сървър (в този случай се свързвам с моята локална машина) с помощта на вашите идентификационни данни (заменете и с действителни стойности). Докато установявате връзка, можете също да посочите базата данни, към която искате да се свържете, както е показано по-долу:

connection =mysql.connector.connect(user='', password='', host='127.0.0.1', database='<име_на_база_данни>')

Умишлено се свързах само със сървър, а не с конкретна база данни, защото ще използвам две бази данни, разположени на един и същ сървър.

Следващата команда – print – тук е само известие, че сме били успешно свързани. Въпреки че няма програмно значение, може да се използва за отстраняване на грешки в кода, ако нещо се обърка в скрипта.

Последният ред в тази част е:

курсор =връзка.курсор()

Курсорите са структурата на манипулатора, използвана за работа с данните. Ще ги използваме за извличане на данни от базата данни (SELECT), но също и за промяна на данните (INSERT, UPDATE, DELETE). Преди да използваме курсора, трябва да го създадем. И това прави тази линия.

Процедури за повикване

Предишната част беше обща и можеше да се използва за други задачи, свързани с база данни. Следната част от кода е специално за ETL:извикване на нашите съхранени процедури с cursor.callproc команда. Изглежда така:

# 1. актуализиране dimensionscursor.callproc('subscription_dwh.p_update_dimensions')print('Таблиците с измерения са актуализирани.')# 2. актуализиране на factscursor.callproc('subscription_dwh.p_update_facts')printd('Fact tables')printd('Fact.') таблици с факти. /предварително> 

Процедурите по обаждане са до голяма степен разбираеми. След всяко обаждане се добавяше команда за печат. Отново, това просто ни дава известие, че всичко е минало наред.

Отвържете и затворете

Последната част на скрипта записва промените в базата данни и затваря всички използвани обекти:

# commit &close connectioncursor.close()connection.commit()connection.close()print('Прекъснато от базата данни.')

Процедурите по обаждане са до голяма степен разбираеми. След всяко обаждане се добавяше команда за печат. Отново, това просто ни дава известие, че всичко е минало наред.

Тук ангажиментът е от съществено значение; без него няма да има промени в базата данни, дори ако сте извикали процедура или сте изпълнили SQL оператор.

Изпълнение на скрипта

Последното нещо, което трябва да направим, е да изпълним нашия скрипт. Ще използваме следните команди в Python Shell, за да постигнем това:

import osfile_path ='D://python_scripts'os.chdir(file_path)exec(open("etl_procedures.py").read())

Скриптът се изпълнява и всички промени се правят в базата данни съответно. Резултатът може да се види на снимката по-долу.

Метод 2:ETL с помощта на Python и MySQL

Подходът, представен по-горе, не се различава много от подхода за извикване на съхранени процедури директно в MySQL. Единствената разлика е, че сега имаме скрипт, който ще свърши цялата работа вместо нас.

Можем да използваме друг подход:да поставим всичко в скрипта на Python. Ще включим изрази на Python, но също така ще подготвим SQL заявки и ще ги изпълним в базата данни. Базата данни източник (на живо) и базата данни местоназначение (DWH) са същите като в примера със съхранените процедури.

Преди да се задълбочим в това, нека да разгледаме пълния скрипт (etl_queries.py ):

от datetime import date# import MySQL connectorimport mysql.connector# connect to serverconnection =mysql.connector.connect(user='', password='', host='127.0.0.1')print ('Свързан с базата данни.')# 1. актуализиране на измерения# 1.1 актуализиране на dim_time# date - yesterdayyesterday =date.fromordinal(date.today().toordinal()-1)yesterday_str ='"' + str(вчера) + ' "'# тест дали датата вече е в таблицатаcursor =connection.cursor()query =( "SELECT COUNT(*) " "ОТ subscription_dwh.dim_time " "КЪДЕ time_date =" + yesterday_str)cursor.execute(query)result =cursor .fetchall()yesterday_subscription_count =int(result[0][0])if yesterday_subscription_count ==0:yesterday_year ='ГОДИНА("' + str(вчера) + '")' yesterday_month ='МЕСЕЦ("' + str(вчера) ) + '")' yesterday_week ='WEEK("' + str(вчера) + '")' yesterday_weekday ='WEEKDAY("' + str(вчера) + '")' query =( "INSERT INTO subscription_dwh.`dim_time". `(`час_дата`, `час_година`, `час_месец`, `час_седмица` , `time_weekday`, `ts`) " " СТОЙНОСТИ (" + yesterday_str + ", " + yesterday_year + ", " + yesterday_month + ", " + yesterday_week + ", " + yesterday_weekday + ", Now())") курсор .execute(query)# 1.2 update dim_cityquery =( "INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) " "ИЗБЕРЕТЕ city_live.city_name, city_live.postal_code, country_live. , Now() " "ОТ subscription_live.city city_live " "ВЪТРЕШНО ПРИСЪЕДИНЯВАНЕ subscription_live.country country_live ON city_live.country_id =country_live.id " "LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name. city_code_city_de. пощенски_код И country_live.country_name =city_dwh.country_name " "КЪДЕ city_dwh.id Е NULL")cursor.execute(query)print('Таблиците с измерения са актуализирани.')# 2. факти за актуализиране# 2.1 абонирани клиенти за актуализиране# изтриване на стари данни за same datequery =( "ИЗТРИВАНЕ на subscription_dwh.`fact_customer_subscribed`.* " "ОТ subscription_dwh.`fa ct_customer_subscribed` " "INNER JOIN subscription_dwh.`dim_time` ON subscription_dwh.`fact_customer_subscribed`.`dim_time_id` =subscription_dwh.`dim_time`.`id` " "WHERE subscription_dwh.`id` " "WHERE subscription_dwh. execute(query)# вмъкнете new dataquery =( "INSERT INTO subscription_dwh.`fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_new`, `daily_new`, `daily`ca) city_dwh.id КАТО dim_ctiy_id, time_dwh.id КАТО dim_time_id, SUM(CASE, КОГАТО customer_live.active =1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active =0 THEN 1 ELSE 0 active END, ASUM total_ СЛУЧАЙ, КОГАТО customer_live.active =1 И ДАТА(customer_live.time_updated) =@time_date THEN 1 ELSE 0 END) КАТО daily_new, SUM(CASE WHEN customer_live.active =0 И DATE(customer_live.time_updated) =@time_date THEN END_date ) AS daily_cancelled, MIN(NOW()) AS ts " "FROM subscription_live.`customer` customer_live " "INNER JOIN subscri ption_live.`city` city_live ON customer_live.city_id =city_live.id " "INNER JOIN subscription_live.`country` country_live ON city_live.country_id =country_live.id " "INNER JOIN subscription_dwh.dim_city city_live.name_city_city_city. .postal_code =city_dwh.postal_code И country_live.country_name =city_dwh.country_name " "INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date =" + yesterday_str + " "GROUP BY city_dwh.country_name (" "ГРУПА BY city_dwh.exe). )# 2.2 актуализиране на статусите на абонамента# изтриване на стари данни за една и съща datequery =( "ИЗТРИВАНЕ на subscription_dwh.`fact_subscription_status`.* " "ОТ subscription_dwh.`fact_subscription_status` " "INNER JOIN subscription_dwh.`dim`h`subscription_dwh.`dim`h`f`s. dim_time_id` =subscription_dwh.`dim_time`.`id` " "КЪДЕ subscription_dwh.`dim_time`.`time_date` =" + yesterday_str)cursor.execute(query)# insert new dataquery =( "INSERT INTO subscription_dwh.`fact _subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_cancelled`, `ts`) " "ИЗБЕРЕТЕ city_dwh.id AS dim_ctiy_id, time_dim SEN_time_dwh_ subscription_live.active =1 THEN 1 ELSE 0 КРАЙ) КАТО общо_активен, SUM(СЛУЧАЙ, КОГАТО subscription_live.active =0 СЛЕД 1 ДРУГА 0 КРАЙ) КАТО общо_неактивен, SUM(СЛУЧАЙ, КОГАТО subscription_live.active =1 И ДАТА(time_upscription_live) time_date THEN 1 ELSE 0 END) КАТО daily_new, SUM(CASE WHEN subscription_live.active =0 И DATE(subscription_live.time_updated) =@time_date THEN 1 ELSE 0 END) КАТО daily_cancelled, MIN(NOW()LIve AS subscription "_ .`customer` customer_live " "INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id =customer_live.id " "INNER JOIN subscription_live.`city` city_live ON customer_live.city_id ="INNER JOIN" subscription_live. country_live ON city_live.country_id =country_live.id " "INNER ПРИСЪЕДИНЕТЕ се subscription_dwh.dim_city city_dwh ON city_live.city_name =city_dwh.city_name И city_live.postal_code =city_dwh.postal_code AND country_live.country_name =city_dwh.country_name " "INNER JOIN subscription_name " "INNER JOIN subscription_day "daster_time "dw_time_dwh "dw_time BY city_dwh.id, time_dwh.id")cursor.execute(query)print('Таблиците с факти са актуализирани.')# commit &close connectioncursor.close()connection.commit()connection.close()print('Прекъснати от базата данни .')

etl_queries.py

Импортиране на модули и свързване към базата данни

Още веднъж ще трябва да импортираме MySQL, използвайки следния код:

импортиране на mysql.connector

Ще импортираме и модула за дата и час, както е показано по-долу. Това ни трябва за операции, свързани с дата в Python:

от дата и дата на импортиране

Процесът за свързване с базата данни е същият като в предишния пример.

Актуализиране на измерението dim_time

За да актуализирате dim_time таблица, ще трябва да проверим дали стойността (за вчера) вече е в таблицата. Ще трябва да използваме функциите за дата на Python (вместо SQL), за да направим това:

# дата - yesterdayyesterday =date.fromordinal(date.today().toordinal()-1)yesterday_str ='"' + str(вчера) + '"'

Първият ред от кода ще върне вчерашната дата в променливата дата, докато вторият ред ще съхранява тази стойност като низ. Ще ни трябва това като низ, защото ще го свържем с друг низ, когато изградим SQL заявката.

След това ще трябва да тестваме дали тази дата вече е в dim_time маса. След като декларираме курсор, ще подготвим SQL заявката. За да изпълним заявката, ще използваме cursor.execute команда:

# тест дали датата вече е в таблицатаcursor =connection.cursor()query =( "SELECT COUNT(*) " "FROM subscription_dwh.dim_time " "WHERE time_date =" + yesterday_str)cursor.execute(query)'" '

Ще съхраняваме резултата от заявката в резултата променлива. Резултатът ще има или 0, или 1 ред, така че можем да тестваме първата колона на първия ред. Той ще съдържа или 0, или 1. (Не забравяйте, че можем да имаме една и съща дата само веднъж в таблица с измерения.)

Ако датата вече не е в таблицата, ще подготвим низовете, които ще бъдат част от SQL заявката:

result =cursor.fetchall()yesterday_subscription_count =int(result[0][0])if yesterday_subscription_count ==0:yesterday_year ='ГОДИНА("' + str(yesterday) + '")' yesterday_month ='МЕСЕЦ( "' + str(вчера) + '")' yesterday_week ='СЕДМИЦА("' + str(вчера) + '")' yesterday_weekday ='WEEKDAY("' + str(вчера) + '")'

Накрая ще изградим заявка и ще я изпълним. Това ще актуализира dim_time таблица, след като е ангажиран. Моля, обърнете внимание, че използвах пълния път към таблицата, включително името на базата данни (subscription_dwh ).

 query =( "INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) " " VALUES (" + yesterday_str + ", " + yesterday_year + ", " + yesterday_month + ", " + yesterday_week + ", " + yesterday_weekday + ", Now())") cursor.execute(query)

Актуализиране на измерението dim_city

dim_city таблицата е още по-проста, защото не е нужно да тестваме нищо преди вмъкването. Всъщност ще включим този тест в SQL заявката.

# 1.2 update dim_cityquery =( "INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) " "SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now, country_live.country_name, () " "ОТ subscription_live.city city_live " "INNER JOIN subscription_live.country country_live ON city_live.country_id =country_live.id " "LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name =city_name =city_dwcode city_ANDw_code city_ANDw. country_live.country_name =city_dwh.country_name " "КЪДЕ city_dwh.id Е NULL")cursor.execute(query)

Тук подготвяме изпълнение на SQL заявката. Забележете, че отново използвах пълните пътища към таблиците, включително имената на двете бази данни (subscription_live и subscription_dwh ).

Актуализиране на таблиците с факти

Последното нещо, което трябва да направим, е да актуализираме нашите таблици с факти. Процесът е почти същият като актуализирането на таблици с измерения:ние подготвяме заявки и ги изпълняваме. Тези заявки са много по-сложни, но са същите като тези, използвани в съхранените процедури.

Добавихме едно подобрение в сравнение със съхранените процедури:изтриване на съществуващите данни за същата дата в таблицата с факти. Това ще ни позволи да стартираме скрипт няколко пъти за една и съща дата. В края ще трябва да извършим транзакцията и да затворим всички обекти и връзката.

Изпълнение на скрипта

Имаме малка промяна в тази част, която извиква различен скрипт:

- import os- file_path ='D://python_scripts'- os.chdir(file_path)- exec(open("etl_queries.py").read())

Тъй като използвахме едни и същи съобщения и скриптът завърши успешно, резултатът е същият:

Как бихте използвали Python в ETL?

Днес видяхме един пример за изпълнение на ETL процеса със скрипт на Python. Има и други начини да направите това, напр. редица решения с отворен код, които използват Python библиотеки за работа с бази данни и изпълнение на ETL процеса. В следващата статия ще играем с един от тях. Междувременно не се колебайте да споделите опита си с Python и ETL.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Регистрирайте всички заявки в mysql

  2. Параметри на свързване за клауза WHERE IN със ЗНП

  3. Има ли нещо нередно с присъединяванията, които не използват ключовата дума JOIN в SQL или MySQL?

  4. Как да намерите ASCII кода за даден символ в MySQL

  5. Как да избегнем вмъкването на дублиращи се записи в MySQL