Database
 sql >> база данни >  >> RDS >> Database

Конфигуриране на Service Broker за асинхронна обработка

В последната си статия говорих за ползите от внедряването на асинхронна обработка с помощта на Service Broker в SQL Server пред другите методи, които съществуват за отделена обработка на дълги задачи. В тази статия ще разгледаме всички компоненти, които трябва да бъдат конфигурирани за основна конфигурация на Service Broker в една база данни, както и важните съображения за управлението на разговорите между услугите на брокера. За да започнем, ще трябва да създадем база данни и да активираме базата данни за използване на Service Broker:

СЪЗДАВАЙТЕ БАЗА ДАННИ AsyncProcessingDemo;GO IF (ИЗБЕРЕТЕ is_broker_enabled ОТ sys.databases WHERE name =N'AsyncProcessingDemo') =0BEGIN ПРОМЕНИ БАЗА ДАННИ AsyncProcessingDemo SET ENABLE_BROKER;ENDGO ИЗПОЛЗВАЙТЕ AsyncProcessing; 

Конфигуриране на компоненти за брокер

Основните обекти, които трябва да бъдат създадени в базата данни, са типовете съобщения за съобщенията, договор, който определя как ще се изпращат съобщенията между услугите, опашката и услугата инициатор, както и опашката и целевата услуга. Много примери онлайн за брокер на услуги показват сложно именуване на обекти за типове съобщения, договори и услуги за Service Broker. Въпреки това, няма изискване имената да са сложни и прости имена на обекти могат да се използват за всеки от обектите.

За съобщенията ще трябва да създадем тип съобщение за заявката, която ще се нарича AsyncRequest и тип съобщение за резултата, който ще се нарича AsyncResult . И двете ще използват XML, който ще бъде валидиран като правилно формиран от брокерските услуги, за да изпращат и получават данните, изисквани от услугите.

-- Създайте типовете съобщенияCREATE MESSAGE TYPE [AsyncRequest] VALIDATION =WELL_FORMED_XML;CREATE MESSAGE TYPE [AsyncResult] VALIDATION =WELL_FORMED_XML;

Договорът посочва, че AsyncRequest ще бъде изпратен от иницииращата услуга до целевата услуга и че целевата услуга ще върне AsyncResult съобщение обратно към иницииращата услуга. Договорът може също да посочи множество типове съобщения за инициатора и целта или че конкретен тип съобщение може да бъде изпратено от всяка услуга, ако конкретната обработка го изисква.

-- Създайте договорCREATE CONTRACT [AsyncContract] ( [AsyncRequest] ИЗПРАТЕНО ОТ ИНИЦИАТОР, [AsyncResult] ИЗПРАТЕНО ОТ ЦЕЛ);

За всяка от услугите трябва да се създаде опашка, която да осигури съхранение на съобщенията, получени от услугата. Целевата услуга, към която ще бъде изпратена заявката, трябва да бъде създадена, като посочи AsyncContract за да разрешите изпращането на съобщения до услугата. В този случай услугата се нарича ProcessingService и ще бъде създаден в ProcessingQueue в рамките на базата данни. Услугата за иницииране не изисква посочване на договор, което я прави в състояние да получава съобщения само в отговор на разговор, иницииран от нея.

-- Създаване на опашка за обработка и услуга - посочете договора, който позволява изпращане до услугатаCREATE QUEUE ProcessingQueue;CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Създайте опашката за заявки и услугата CREATE QUEUE RequestQueue;CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Изпращане на съобщение за обработка

Както обясних в предишната статия, предпочитам да внедря съхранена процедура в обвивка за изпращане на ново съобщение до брокерска услуга, така че да може да бъде променено веднъж за мащабиране на производителността, ако е необходимо. Тази процедура е проста обвивка около създаването на нов разговор и изпращането на съобщението до ProcessingService .

-- Създайте процедурата за обвивка за изпращане на съобщенияCREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XMLASBEGIN SET NOCOUNT ON; ДЕКЛАРИРАНЕ @conversation_handle УНИКАЛЕН; ЗАПОЧНЕТЕ ТРАНЗАКЦИЯ; ЗАПОЧНЕТЕ ДИАЛОГОВ РАЗГОВОР @conversation_handle ОТ УСЛУГА @FromService КЪМ УСЛУГА @ToService НА ДОГОВОР @Договор С КРИПТОРАНЕ =ИЗКЛЮЧЕНО; ИЗПРАЩАНЕ НА РАЗГОВОР @conversation_handle ТИП СЪОБЩЕНИЕ @MessageType(@MessageBody); ИЗВЪРШЕТЕ ТРАНЗАКЦИЯ;ENDGO

Използвайки съхранената процедура в обвивка, вече можем да изпратим тестово съобщение до ProcessingService за да потвърдим, че сме настроили правилно брокерските услуги.

-- Изпратете заявка EXECUTE dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N'ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Проверете за съобщение на опашката за обработкаSELECT CAST(message_body AS XML) FROM ProcessingQueue;GO

Обработване на съобщения

Докато бихме могли ръчно да обработваме съобщенията от ProcessingQueue , вероятно ще искаме съобщенията да се обработват автоматично, тъй като се изпращат до ProcessingService . За да направите това, трябва да бъде създадена съхранена процедура за активиране, която ще тестваме и след това по-късно ще се свържем с опашката, за да автоматизираме обработката при активиране на опашката. За да обработим съобщение, трябва да RECEIVE съобщението от опашката в рамките на транзакция, заедно с типа на съобщението и манипулатора на разговор за съобщението. Типът на съобщението гарантира, че подходящата логика се прилага към съобщението, което се обработва, а манипулаторът на разговор позволява да бъде изпратен отговор обратно към иницииращата услуга, когато съобщението е обработено.

RECEIVE команда позволява едно съобщение или множество съобщения в рамките на един манипулатор на разговор или група да бъдат обработени в една транзакция. За да се обработват множество съобщения, трябва да се използва променлива на таблица или за обработка на едно съобщение може да се използва локална променлива. Процедурата за активиране по-долу извлича едно съобщение от опашката, проверява типа съобщение, за да определи дали е AsyncRequest съобщение и след това изпълнява продължителния процес въз основа на получената информация за съобщението. Ако не получи съобщение в рамките на цикъла, той ще изчака до 5000 мс или 5 секунди, за да влезе друго съобщение в опашката, преди да излезе от цикъла и да прекрати изпълнението му. След обработка на съобщение, той изгражда AsyncResult съобщение и го изпраща обратно на инициатора на същия манипулатор на разговор, от който е получено съобщението. Процедурата също така проверява типа на съобщението, за да определи дали EndDialog или Error беше получено съобщение за изчистване на разговора, като го прекратите.

-- Създаване на процедура за обработка за обработка на queueCREATE PROCEDURE dbo.ProcessingQueueActivationASBEGIN SET NOCOUNT ON; ДЕКЛАРИРАНЕ @conversation_handle УНИКАЛЕН; ДЕКЛАРИРАНЕ на @message_body XML; ДЕКЛАРИРАНЕ @име_тип_съобщение sysname; ДОКАТО (1=1) ЗАПОЧНЕТЕ ТРАНЗАКЦИЯТА; WAITFOR ( ПОЛУЧАВАНЕ НА ВЪРХУ (1) @conversation_handle =дръжка_на_разговор, @message_body =CAST(тяло_съобщение КАТО XML), @message_type_name =message_type_name ОТ ProcessingQueue), TIMEOUT 5000; АКО (@@ROWCOUNT =0) ЗАПОЧНЕ ТРАНЗАКЦИЯТА ЗА ОТМЕНЯНЕ; BREAK; END IF @message_type_name =N'AsyncRequest' BEGIN -- Обработвайте сложна дълга обработка тук -- За демонстрация ще изтеглим номера на сметката и ще изпратим отговор само DECLARE @AccountNumber INT =@message_body.value('(AsyncRequest/AccountNumber) [1]', 'INT'); -- Създайте съобщение за отговор и изпратете обратно DECLARE @reply_message_body XML =N' ' + CAST(@AccountNumber КАТО NVARCHAR(11)) + ' '; ИЗПРАЩАНЕ НА РАЗГОВОР @conversation_handle ТИП СЪОБЩЕНИЕ [AsyncResult] (@reply_message_body); КРАЙ -- Ако приключи диалоговото съобщение, завършете диалоговия прозорец ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- Ако съобщение за грешка, регистрирайте и прекратете разговора ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Регистрирайте кода за грешка и извършете всяка необходима обработка тук -- Край разговорът за грешка END CONVERSATION @conversation_handle; КРАЙ НА ТРАНЗАКЦИЯТА ЗА КОМИТИРАНЕ; ENDENDGO

RequestQueue също ще трябва да обработи съобщенията, които са изпратени до него, така че допълнителна процедура за обработка на AsyncResult съобщенията, върнати от процедурата ProcessingQueueActivation, трябва да бъдат създадени. Тъй като знаем, че съобщението AsnycResult означава, че цялата работа по обработката е завършена, разговорът може да бъде прекратен, след като обработим това съобщение, което ще изпрати EndDialog съобщение до ProcessingService, което след това ще бъде обработено от неговата процедура за активиране, за да прекрати разговор почистване на всичко и избягване на огъня и забравяне на проблемите, които се случват, когато разговорите приключат правилно.

-- Създаване на процедура за обработка на отговори на опашката на заявкатаCREATE PROCEDURE dbo.RequestQueueActivationASBEGIN SET NOCOUNT ON; ДЕКЛАРИРАНЕ @conversation_handle УНИКАЛЕН; ДЕКЛАРИРАНЕ на @message_body XML; ДЕКЛАРИРАНЕ @име_тип_съобщение sysname; ДОКАТО (1=1) ЗАПОЧНЕТЕ ТРАНЗАКЦИЯТА; WAITFOR ( ПОЛУЧАВАНЕ НА ВЪРХУ (1) @conversation_handle =дръжка_на_разговор, @message_body =CAST(тяло_съобщение КАТО XML), @message_type_name =message_type_name ОТ RequestQueue), TIMEOUT 5000; АКО (@@ROWCOUNT =0) ЗАПОЧНЕ ТРАНЗАКЦИЯТА ЗА ОТМЕНЯНЕ; BREAK; END IF @message_type_name =N'AsyncResult' BEGIN -- Ако е необходимо, обработвайте съобщението за отговор тук DECLARE @AccountNumber INT =@message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Тъй като това е цялата работа, която се извършва, прекратете разговора, за да изпратите съобщението EndDialog END CONVERSATION @conversation_handle; КРАЙ -- Ако приключи диалоговото съобщение, завършете диалоговия прозорец ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- Ако съобщение за грешка, регистрирайте и прекратете разговора ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; КРАЙ НА ТРАНЗАКЦИЯТА ЗА КОМИТИРАНЕ; ENDENDGO

Тестване на процедурите

Преди да автоматизираме обработката на опашката за нашите услуги, е важно да тестваме процедурите за активиране, за да гарантираме, че те обработват съобщенията по подходящ начин и да предотвратим деактивирането на опашката, ако възникне грешка, която не се обработва правилно. Тъй като вече има съобщение в ProcessingQueue ProcessingQueueActivation може да се изпълни процедура за обработка на това съобщение. Имайте предвид, че WAITFOR ще доведе до прекратяване на процедурата за 5 секунди, въпреки че съобщението се обработва незабавно от опашката. След обработка на съобщението, можем да проверим, че процедурата работи правилно, като потърсим RequestQueue за да видите дали AsyncResult съобщение съществува и тогава можем да проверим дали RequestQueueActivation процедурата функционира правилно, като я изпълни.

-- Обработете съобщението от опашката за обработка EXECUTE dbo.ProcessingQueueActivation;GO -- Проверете за съобщение за отговор при заявка queueSELECT CAST(message_body AS XML) FROM RequestQueue;GO -- Обработете съобщението от опашката на заявкаEXECUTE dbo.RequestQueueActive 

Автоматизиране на обработката

Към този момент всички компоненти са завършени, за да автоматизираме напълно нашата обработка. Единственото, което остава, е да свържете процедурите за активиране към съответните им опашки и след това да изпратите друго тестово съобщение, за да потвърдите, че е обработено и нищо не остава в опашките след това.

-- Променете опашката за обработка, за да посочите вътрешно активиране ALTER QUEUE ProcessingQueue С АКТИВАЦИЯ ( СТАТУС =ВКЛЮЧЕНО, PROCEDURE_NAME =dbo.ProcessingQueueActivation, MAX_QUEUE_READERS =10, ИЗПЪЛНЕТЕ КАТО СЕБЕ на заявката); С АКТИВАЦИЯ ( СТАТУС =ВКЛЮЧЕНО, ПРОЦЕДУРА_ИМЕ =dbo.RequestQueueActivation, MAX_QUEUE_READERS =10, ИЗПЪЛНЕТЕ КАТО СЕБЕ );GO -- Тествайте автоматично активиране-- Изпратете заявка EXECUTE dbo.SendBrokerMessage @From'Request'S =vice N'Request'Ser ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Проверете за съобщение на опашката за обработка -- няма нищо, защото е обработено автоматично SELECT CAST(message_body AS XML) FROM ProcessingQueue;GO -- Проверете за съобщение за отговор на опашката за заявка -- нищо не е там, защото е обработено автоматично SELECT CAST(message_body КАТО XML) ОТ RequestQueue;GO

Резюме

Основните компоненти за автоматизирана асинхронна обработка в SQL Server Service Broker могат да бъдат конфигурирани в една настройка на базата данни, за да се даде възможност за отделена обработка на продължително изпълнявани задачи. Това може да бъде мощен инструмент за подобряване на производителността на приложението от изживяването на крайния потребител чрез отделяне на обработката от взаимодействията на крайния потребител с приложението.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Онлайн инструменти за изпробване на SQL дизайн и заявки

  2. Как да проектирате система, готова за локализация

  3. Решения за читатели за предизвикателството на специалните острови

  4. Моделиране на отворен пазар за образование

  5. Добавете колона към таблица в SQL