Тази статия ще бъде интересна за тези, които често трябва да се занимават с интегриране на данни.
Въведение
Да приемем, че има база данни, в която потребителите винаги променят данните (актуализират или премахват). Може би тази база данни се използва от голямо приложение, което не позволява промяна на структурата на таблицата. Задачата е да се зареждат данни от тази база данни в друга база данни на различен сървър от време на време. Най-простият начин за справяне с проблема е да заредите новите данни от изходна база данни в целева база данни с предварително почистване на целевата база данни. Можете да използвате този метод, стига времето за зареждане на данни да е приемливо и не надвишава предварително зададените срокове. Ами ако зареждането на данни отнема няколко дни? В допълнение, нестабилните комуникационни канали водят до ситуация, когато натоварването на данни спира и се рестартира. Ако се сблъскате с тези препятствия, предлагам да разгледате един от алгоритмите за „презареждане на данни“. Това означава, че са извършени само модификации на данните след зареждането на последното зареждане.
CDC
В SQL Server 2008 Microsoft въведе механизъм за проследяване на данни, наречен Change Data Capture (CDC). Най-общо казано, целта на този механизъм е активирането на CDC за всяка таблица на база данни да създаде системна таблица в същата база данни с подобно име като оригиналната таблица (схемата ще бъде както следва:'cdc' като префикс плюс старо име на схема плюс ”_” и краят “_CT”. Например, оригиналната таблица е dbo.Example, тогава системната таблица ще се нарича cdc.dbo_Example_CT). Той ще съхранява всички данни, които са били променени.
Всъщност, за да копаем по-дълбоко в CDC, разгледайте примера. Но първо се уверете, че SQL агент, който използва CDC, работи на тестовия екземпляр на SQL Server.
Освен това ще разгледаме скрипт, който създава база данни и тестова таблица, попълва тази таблица с данни и активира CDC за тази таблица.
За да разберем и опростим задачата, ще използваме един екземпляр на SQL Server, без да разпространяваме изходната и целевата база данни към различни сървъри.
използвайте mastergo-- създайте изходна база данни, ако не съществува (изберете * от sys.databases, където име ='db_src_cdc') създайте база данни db_src_cdcgouse db_src_cdcgo-- активирайте CDC, ако е забранено, ако не съществува (изберете * от sys.databases, където =db_name() и is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- създайте роля за таблици с CDC, ако не съществува (изберете * от sys.sysusers, където name ='CDC_Reader' и issqlrole=1) създайте роля CDC_Readergo- създайте tableif- object_id('dbo.Example','U') е нулева създаване на таблица dbo.Example ( ID int identity constraint PK_Example първичен ключ, Title varchar(200) не null )go-- попълнете tableinsert dbo.Example (Title) values( 'One'),('Two'),('Three'),('Four'),('Five');go-- активирайте CDC за таблицата, ако не съществува (изберете * от sys.tables, където is_tracked_by_cdc =1 и name ='Пример') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Пример', @role_name ='CDC_Reader'go-- попълнете таблицата с някои данни. Ще променим или изтрием нещоupdate dbo.Exampleset Title =reverse(Title)where ID в (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Примерни (ID, Заглавие) стойности (1,'Едно'),(6,'Шест');задайте identity_insert dbo.Example off;go
Сега, нека да разгледаме какво имаме след изпълнението на този скрипт в таблиците dbo.Example и cdc.dbo_Example_CT (трябва да се отбележи, че CDC е асинхронен. Данните се попълват в таблиците, където проследяването на промените се съхранява след определен период от време ).
изберете * от dbo.Example;
Заглавие на ID---- --------------------- 1 Един 3 eerhT 4 ruoF 5 Пет 6 Шест
изберете row_number() над ( разделяне по реда на ID от __$start_lsn desc, __$seqval desc ) като __$rn, *от cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Заглавие------ --------------------- - ----------- --------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 четири 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 RUOF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03
Разгледайте подробно структурата на таблицата, в която се съхранява проследяването на промените. Полетата __ $start_lsn и __ $seqval са съответно LSN (последователен номер на журнала в базата данни) и номерът на транзакцията в рамките на транзакцията. В тези полета има важно свойство, а именно, можем да сме сигурни, че записът с по-висок LSN ще бъде извършен по-късно. Благодарение на това свойство можем лесно да получим най-новото състояние на всеки запис в заявката, като филтрираме нашия избор по условието – където __ $ rn =1.
Полето __$operation съдържа кода на транзакцията:
- 1 – записът е изтрит
- 2 – записът е вмъкнат
- 3, 4 – записът се актуализира. Старите данни преди актуализацията са 3, новите данни са 4.
В допълнение към служебните полета с префикс «__$», полетата на оригиналната таблица са напълно дублирани. Тази информация ни е достатъчна, за да преминем към постепенното натоварване.
Настройване на база данни за зареждане на данни
Създайте таблица в нашата тестова целева база данни, в която ще бъдат заредени данните, както и допълнителна таблица за съхраняване на данни за дневника за зареждане.
използвайте mastergo-- създайте целева база данни, ако не съществува (изберете * от sys.databases, където име ='db_dst_cdc') създайте база данни db_dst_cdcgouse db_dst_cdcgo-- създайте таблица, ако object_id('dbo.Example','U') е null create table dbo.Example ( ID int ограничение PK_Example първичен ключ, Title varchar(200) не null )go-- създайте таблица за съхраняване на натоварването logif object_id('dbo.log_cdc','U') е нулева създаване на таблица dbo .log_cdc ( table_name nvarchar(512) не е null, dt datetime не е null default getdate(), lsn binary(10) не е null default(0x0), ограничение pk_log_cdc първичен ключ (table_name,dt desc) )go
Бих искал да обърна вниманието ви към полетата на таблицата LOG_CDC:
- TABLE_NAME съхранява информация за това каква таблица е заредена (възможно е да се заредят няколко таблици в бъдеще, от различни бази данни или дори от различни сървъри; форматът на таблицата е „SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME“
- DT е поле за дата и час на зареждане, което е по избор за допълнителното натоварване. Въпреки това ще бъде полезно за одитиране на зареждането.
- LSN – след като таблицата се зареди, трябва да съхраняваме информация за мястото, откъдето да започнем следващото зареждане, ако е необходимо. Съответно, след всяко зареждане добавяме последния (максимален) __ $ start_lsn в тази колона.
Алгоритъм за зареждане на данни
Както е описано по-горе, използвайки заявката, можем да получим най-новото състояние на таблицата с помощта на функциите на прозореца. Ако знаем LSN за последното зареждане, следващия път, когато заредим, можем да филтрираме от източника всички данни, чиито промени са по-високи от съхранения LSN, ако е имало поне едно пълно предишно зареждане:
с incr_Example като ( изберете row_number() над ( разделяне по идентификационен номер по __$start_lsn desc, __$seqval desc ) като __$rn, * от db_src_cdc.cdc.dbo_Example_CT където __$operation_ <_$ 3 start_lsn> @lsn)изберете * от incr_Example
След това можем да получим всички записи за пълното натоварване, ако LSN за натоварване не е съхранен:
с incr_Example като ( изберете row_number() над ( разделяне по идентификационен номер по __$start_lsn desc, __$seqval desc ) като __$rn, * от db_src_cdc.cdc.dbo_Example_CT където __$operation_ <_$ 3 start_lsn> @lsn), full_Example като (изберете * от db_src_cdc.dbo.Пример, където @lsn е null) изберете идентификатор, заглавие, __$operationfrom incr_Example, където __$rn =1union allselect ID, заглавие, 2 като ___$operationfrПо този начин, в зависимост от стойността @LSN, тази заявка ще покаже или всички последни промени (заобикаляйки междинните) със статус Премахнато или не, или всички данни от оригиналната таблица, добавяйки статус 2 (нов запис) – това поле се използва само за обединяване на две селекции. С тази заявка можем лесно да приложим или пълно натоварване, или презареждане с помощта на командата MERGE (започвайки с версията на SQL 2008).
За да избегнете тесни места, които могат да създадат алтернативни процеси и да заредите съвпадащи данни от различни таблици (в бъдеще ще заредим няколко таблици и е възможно да има релационни връзки между тях), предлагам да използвате моментна снимка на DB на изходната база данни ( друга функция на SQL 2008).
Пълният текст на натоварването е както следва:
[expand title=”Код”]
/* Алгоритъм за зареждане на данни*/-- създаване на моментна снимка на база данни, ако съществува (изберете * от sys.databases, където name ='db_src_cdc_ss') пуснете база данни db_src_cdc_ss;declare @query nvarchar(max);select @query =N' създайте база данни db_src_cdc_ss на ( име =N'''+име+ ''', име на файл =N'''+[име на файл]+'.ss'') като моментна снимка на db_src_cdc'от db_src_cdc.sys.sysfiles, където groupid =1; exec ( @query );-- прочетете LSN от предишния loaddeclare @lsn binary(10) =(изберете max(lsn) от db_dst_cdc.dbo.log_cdc, където table_name ='localhost.db_src_cdc.dbo.Example');-- изчистете таблица преди пълното зареждане, ако @lsn е нулева съкращаване на таблицата db_dst_cdc.dbo.Example;-- заредете процес с incr_Example като (изберете row_number() над ( дял по идентификационен номер по __$start_lsn desc, __$seqval desc ) като __$ , * от db_src_cdc_ss.cdc.dbo_Example_CT, където __$operation <> 3 и __$start_lsn> @lsn), full_Example като (изберете * от db_src_cdc_ss.dbo.Пример, където @lsn е нулев идентификатор, cc select е null, c Заглавие, __$операция от incr_Example, където __$rn =1 обединение всички изберете идентификатор, заглавие, 2 като __$операция от full_Example) обединете db_dst_cdc.dbo. Пример като trg, като използвате cte_Example като src на trg.ID=src.ID, когато съвпадат и __$operation =1 след това изтрийте при съвпадение и __$operation <> 1 след това актуализирайте set trg.Title =src.Title, когато не съответства на целта и __$operation <> 1, след което вмъкнете (ID, Title) стойности (src.ID, src .Title);-- маркирайте края на процеса на зареждане и най-новите LSNinsert db_dst_cdc.dbo.log_cdc (table_name, lsn)values ('localhost.db_src_cdc.dbo.Example', isnull((изберете от max(__$start_lsn) db_src_cdc_ss.cdc.dbo_Example_CT),0))-- изтрийте снимката на базата данни, ако съществува (изберете * от sys.databases, където name ='db_src_cdc_ss') пуснете база данни db_src_cdc_ss[/expand]