Sqlserver
 sql >> база данни >  >> RDS >> Sqlserver

Внедряване на инкрементално натоварване с помощта на заснемане на промяна на данни в SQL Server

Тази статия ще бъде интересна за тези, които често трябва да се занимават с интегриране на данни.

Въведение

Да приемем, че има база данни, в която потребителите винаги променят данните (актуализират или премахват). Може би тази база данни се използва от голямо приложение, което не позволява промяна на структурата на таблицата. Задачата е да се зареждат данни от тази база данни в друга база данни на различен сървър от време на време. Най-простият начин за справяне с проблема е да заредите новите данни от изходна база данни в целева база данни с предварително почистване на целевата база данни. Можете да използвате този метод, стига времето за зареждане на данни да е приемливо и не надвишава предварително зададените срокове. Ами ако зареждането на данни отнема няколко дни? В допълнение, нестабилните комуникационни канали водят до ситуация, когато натоварването на данни спира и се рестартира. Ако се сблъскате с тези препятствия, предлагам да разгледате един от алгоритмите за „презареждане на данни“. Това означава, че са извършени само модификации на данните след зареждането на последното зареждане.

CDC

В SQL Server 2008 Microsoft въведе механизъм за проследяване на данни, наречен Change Data Capture (CDC). Най-общо казано, целта на този механизъм е активирането на CDC за всяка таблица на база данни да създаде системна таблица в същата база данни с подобно име като оригиналната таблица (схемата ще бъде както следва:'cdc' като префикс плюс старо име на схема плюс ”_” и краят “_CT”. Например, оригиналната таблица е dbo.Example, тогава системната таблица ще се нарича cdc.dbo_Example_CT). Той ще съхранява всички данни, които са били променени.

Всъщност, за да копаем по-дълбоко в CDC, разгледайте примера. Но първо се уверете, че SQL агент, който използва CDC, работи на тестовия екземпляр на SQL Server.

Освен това ще разгледаме скрипт, който създава база данни и тестова таблица, попълва тази таблица с данни и активира CDC за тази таблица.

За да разберем и опростим задачата, ще използваме един екземпляр на SQL Server, без да разпространяваме изходната и целевата база данни към различни сървъри.

използвайте mastergo-- създайте изходна база данни, ако не съществува (изберете * от sys.databases, където име ='db_src_cdc') създайте база данни db_src_cdcgouse db_src_cdcgo-- активирайте CDC, ако е забранено, ако не съществува (изберете * от sys.databases, където =db_name() и is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- създайте роля за таблици с CDC, ако не съществува (изберете * от sys.sysusers, където name ='CDC_Reader' и issqlrole=1) създайте роля CDC_Readergo- създайте tableif- object_id('dbo.Example','U') е нулева създаване на таблица dbo.Example ( ID int identity constraint PK_Example първичен ключ, Title varchar(200) не null )go-- попълнете tableinsert dbo.Example (Title) values( 'One'),('Two'),('Three'),('Four'),('Five');go-- активирайте CDC за таблицата, ако не съществува (изберете * от sys.tables, където is_tracked_by_cdc =1 и name ='Пример') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Пример', @role_name ='CDC_Reader'go-- попълнете таблицата с някои данни. Ще променим или изтрием нещоupdate dbo.Exampleset Title =reverse(Title)where ID в (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Примерни (ID, Заглавие) стойности (1,'Едно'),(6,'Шест');задайте identity_insert dbo.Example off;go

Сега, нека да разгледаме какво имаме след изпълнението на този скрипт в таблиците dbo.Example и cdc.dbo_Example_CT (трябва да се отбележи, че CDC е асинхронен. Данните се попълват в таблиците, където проследяването на промените се съхранява след определен период от време ).

изберете * от dbo.Example;
Заглавие на ID---- --------------------- 1 Един 3 eerhT 4 ruoF 5 Пет 6 Шест
изберете row_number() над ( разделяне по реда на ID от __$start_lsn desc, __$seqval desc ) като __$rn, *от cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Заглавие------ --------------------- - ----------- --------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 четири 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 RUOF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03 

Разгледайте подробно структурата на таблицата, в която се съхранява проследяването на промените. Полетата __ $start_lsn и __ $seqval са съответно LSN (последователен номер на журнала в базата данни) и номерът на транзакцията в рамките на транзакцията. В тези полета има важно свойство, а именно, можем да сме сигурни, че записът с по-висок LSN ще бъде извършен по-късно. Благодарение на това свойство можем лесно да получим най-новото състояние на всеки запис в заявката, като филтрираме нашия избор по условието – където __ $ rn =1.

Полето __$operation съдържа кода на транзакцията:

  • 1 – записът е изтрит
  • 2 – записът е вмъкнат
  • 3, 4 – записът се актуализира. Старите данни преди актуализацията са 3, новите данни са 4.

В допълнение към служебните полета с префикс «__$», полетата на оригиналната таблица са напълно дублирани. Тази информация ни е достатъчна, за да преминем към постепенното натоварване.

Настройване на база данни за зареждане на данни

Създайте таблица в нашата тестова целева база данни, в която ще бъдат заредени данните, както и допълнителна таблица за съхраняване на данни за дневника за зареждане.

използвайте mastergo-- създайте целева база данни, ако не съществува (изберете * от sys.databases, където име ='db_dst_cdc') създайте база данни db_dst_cdcgouse db_dst_cdcgo-- създайте таблица, ако object_id('dbo.Example','U') е null create table dbo.Example ( ID int ограничение PK_Example първичен ключ, Title varchar(200) не null )go-- създайте таблица за съхраняване на натоварването logif object_id('dbo.log_cdc','U') е нулева създаване на таблица dbo .log_cdc ( table_name nvarchar(512) не е null, dt datetime не е null default getdate(), lsn binary(10) не е null default(0x0), ограничение pk_log_cdc първичен ключ (table_name,dt desc) )go

Бих искал да обърна вниманието ви към полетата на таблицата LOG_CDC:

  • TABLE_NAME съхранява информация за това каква таблица е заредена (възможно е да се заредят няколко таблици в бъдеще, от различни бази данни или дори от различни сървъри; форматът на таблицата е „SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME“
  • DT е поле за дата и час на зареждане, което е по избор за допълнителното натоварване. Въпреки това ще бъде полезно за одитиране на зареждането.
  • LSN – след като таблицата се зареди, трябва да съхраняваме информация за мястото, откъдето да започнем следващото зареждане, ако е необходимо. Съответно, след всяко зареждане добавяме последния (максимален) __ $ start_lsn в тази колона.

Алгоритъм за зареждане на данни

Както е описано по-горе, използвайки заявката, можем да получим най-новото състояние на таблицата с помощта на функциите на прозореца. Ако знаем LSN за последното зареждане, следващия път, когато заредим, можем да филтрираме от източника всички данни, чиито промени са по-високи от съхранения LSN, ако е имало поне едно пълно предишно зареждане:

с incr_Example като ( изберете row_number() над ( разделяне по идентификационен номер по __$start_lsn desc, __$seqval desc ) като __$rn, * от db_src_cdc.cdc.dbo_Example_CT където __$operation_ <_$ 3 start_lsn> @lsn)изберете * от incr_Example

След това можем да получим всички записи за пълното натоварване, ако LSN за натоварване не е съхранен:

с incr_Example като ( изберете row_number() над ( разделяне по идентификационен номер по __$start_lsn desc, __$seqval desc ) като __$rn, * от db_src_cdc.cdc.dbo_Example_CT където __$operation_ <_$ 3 start_lsn> @lsn), full_Example като (изберете * от db_src_cdc.dbo.Пример, където @lsn е null) изберете идентификатор, заглавие, __$operationfrom incr_Example, където __$rn =1union allselect ID, заглавие, 2 като ___$operationfr 

По този начин, в зависимост от стойността @LSN, тази заявка ще покаже или всички последни промени (заобикаляйки междинните) със статус Премахнато или не, или всички данни от оригиналната таблица, добавяйки статус 2 (нов запис) – това поле се използва само за обединяване на две селекции. С тази заявка можем лесно да приложим или пълно натоварване, или презареждане с помощта на командата MERGE (започвайки с версията на SQL 2008).

За да избегнете тесни места, които могат да създадат алтернативни процеси и да заредите съвпадащи данни от различни таблици (в бъдеще ще заредим няколко таблици и е възможно да има релационни връзки между тях), предлагам да използвате моментна снимка на DB на изходната база данни ( друга функция на SQL 2008).

Пълният текст на натоварването е както следва:

[expand title=”Код”]

/* Алгоритъм за зареждане на данни*/-- създаване на моментна снимка на база данни, ако съществува (изберете * от sys.databases, където name ='db_src_cdc_ss') пуснете база данни db_src_cdc_ss;declare @query nvarchar(max);select @query =N' създайте база данни db_src_cdc_ss на ( име =N'''+име+ ''', име на файл =N'''+[име на файл]+'.ss'') като моментна снимка на db_src_cdc'от db_src_cdc.sys.sysfiles, където groupid =1; exec ( @query );-- прочетете LSN от предишния loaddeclare @lsn binary(10) =(изберете max(lsn) от db_dst_cdc.dbo.log_cdc, където table_name ='localhost.db_src_cdc.dbo.Example');-- изчистете таблица преди пълното зареждане, ако @lsn е нулева съкращаване на таблицата db_dst_cdc.dbo.Example;-- заредете процес с incr_Example като (изберете row_number() над ( дял по идентификационен номер по __$start_lsn desc, __$seqval desc ) като __$ , * от db_src_cdc_ss.cdc.dbo_Example_CT, където __$operation <> 3 и __$start_lsn> @lsn), full_Example като (изберете * от db_src_cdc_ss.dbo.Пример, където @lsn е нулев идентификатор, cc select е null, c Заглавие, __$операция от incr_Example, където __$rn =1 обединение всички изберете идентификатор, заглавие, 2 като __$операция от full_Example) обединете db_dst_cdc.dbo. Пример като trg, като използвате cte_Example като src на trg.ID=src.ID, когато съвпадат и __$operation =1 след това изтрийте при съвпадение и __$operation <> 1 след това актуализирайте set trg.Title =src.Title, когато не съответства на целта и __$operation <> 1, след което вмъкнете (ID, Title) стойности (src.ID, src .Title);-- маркирайте края на процеса на зареждане и най-новите LSNinsert db_dst_cdc.dbo.log_cdc (table_name, lsn)values ​​('localhost.db_src_cdc.dbo.Example', isnull((изберете от max(__$start_lsn) db_src_cdc_ss.cdc.dbo_Example_CT),0))-- изтрийте снимката на базата данни, ако съществува (изберете * от sys.databases, където name ='db_src_cdc_ss') пуснете база данни db_src_cdc_ss

[/expand]


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Защо не мога да изпълня агрегатна функция върху израз, съдържащ агрегат, но мога да го направя, като създам нов оператор за избор около него?

  2. Как мога да принудя рамката на обекта да вмъква колони за идентичност?

  3. Как да направите заявка с group_concat в sql сървър

  4. Неправилен синтаксис близо до ключовата дума 'with'...предишният израз трябва да бъде завършен с точка и запетая

  5. C# Еквивалент на SQL Server DataTypes