2018-10-04

Using Service Broker to implement Asynchronous Trigger

Suppose you have an OLTP system, which lets users to place purchase orders (PO), the UI response reply to the user must be fast, but there's a time consuming background process need to undergo for each PO placed by the users. Such system can be implemented by using triggers and Service Broker service. A trigger queues a message that requests work from a Service Broker service. The trigger does not actually perform the requested work. Instead, the trigger creates a message that contains information about the work to be done and sends this message to a service that performs the work. The trigger then returns. When the original transaction commits, Service Broker delivers the message to the destination service. The program that implements the service performs the work in a separate transaction. By performing this work in a separate transaction, the original transaction can commit immediately. The application avoids system slowdowns that result from keeping the original transaction open while performing the work.
Below is a demonstration. There's a main table named DepartmentMaster. It has an insert trigger, which sends a message to a service broker service. The service broker service is implemented by internal activation stored procedure on the target queue, with MAX_QUEUE_READERS = 5, meaning the maximum number of instances of the activation stored procedure that the queue starts at the same time. When you insert a row into DepartmentMaster table, finally an XML file will be created inside a folder C:\TEMP\PETER\ in your testing database server. For the sake of simplicity, the example uses OLE Automation Stored Procedures for SQL Server to write the XML file.

USE master;
GO
EXEC sp_configure 'show advanced options', 1;
GO
RECONFIGURE;
GO
EXEC sp_configure 'Ole Automation Procedures', 1;
GO
RECONFIGURE;
GO

CREATE DATABASE [SBTEST];
GO
ALTER DATABASE [SBTEST] SET TRUSTWORTHY ON;
ALTER DATABASE [SBTEST] SET ENABLE_BROKER;
GO

USE [SBTEST]
GO
CREATE PROCEDURE [dbo].[WriteToFile]
  @file varchar(2000),
  @text nvarchar(max)
AS  
BEGIN
  DECLARE @ole int;  
  DECLARE @fileId int;
  EXECUTE sp_OACreate 'Scripting.FileSystemObject', @ole OUT;
  EXECUTE sp_OAMethod @ole, 'OpenTextFile', @fileId OUT, @file, 8, 1;
  EXECUTE sp_OAMethod @fileId, 'WriteLine', Null, @text;
  EXECUTE sp_OADestroy @fileId;
  EXECUTE sp_OADestroy @ole;
END
GO
CREATE MESSAGE TYPE [TestMessage] VALIDATION = WELL_FORMED_XML
GO
CREATE CONTRACT [TestContract] ([TestMessage] SENT BY INITIATOR)
GO
CREATE QUEUE [dbo].[TestQueue] WITH STATUS = OFF
GO
CREATE SERVICE [TestServiceInitiator]  ON QUEUE [dbo].[TestQueue] ([TestContract])
GO
CREATE SERVICE [TestServiceTarget]  ON QUEUE [dbo].[TestQueue] ([TestContract])
GO
CREATE TABLE [dbo].[auditlog](
 [xmlstring] [xml] NULL,
 [logTime] [datetime2](2) NOT NULL,
 [isSuccess] [bit] NOT NULL,
 [err_num] [int] NULL,
 [err_msg] [nvarchar](4000) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
-- DEMO TABLE
CREATE TABLE [dbo].[DepartmentMaster](
 [DepartmentId] [int] IDENTITY(1,1) NOT NULL,
 [Name] [varchar](50) NULL,
 [Description] [varchar](50) NULL,
 CONSTRAINT [PK_DepartmentMaster1] PRIMARY KEY CLUSTERED
(
 [DepartmentId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
CREATE  TRIGGER [dbo].[Trg_DepartmentMaster_INSERT]
ON  [dbo].[DepartmentMaster]
FOR INSERT
AS
BEGIN
            SET NOCOUNT ON;
            DECLARE @MessageBody XML 
            DECLARE @TableId int 
            --get relevant information from inserted/deleted and convert to xml message 
            SET @MessageBody = (SELECT DepartmentId,Name,Description FROM inserted 
            FOR XML AUTO)               
            If (@MessageBody IS NOT NULL) 
            BEGIN 
      -- **** INSERT TRIGGER SEND a SB request message, then return immediately ****
                        DECLARE @Handle UNIQUEIDENTIFIER;  
                        BEGIN DIALOG CONVERSATION @Handle  
                        FROM SERVICE [TestServiceInitiator]  
                        TO SERVICE 'TestServiceTarget'  
                        ON CONTRACT [TestContract]  
                        WITH ENCRYPTION = OFF;  
                        SEND ON CONVERSATION @Handle  
                        MESSAGE TYPE [TestMessage](@MessageBody);
            END
END
GO
/* **** HANDLER SP, automatically fired by Service Broker **** */
CREATE PROCEDURE [dbo].[spMessageProcTest]
AS
SET NOCOUNT ON;
DECLARE @message_type varchar(100), @dialog uniqueidentifier, @message_body XML;
 
BEGIN TRY
   
 BEGIN TRAN;
 WAITFOR (
  RECEIVE TOP(1)
   @message_type = message_type_name,
   @message_body = CAST(message_body AS XML),
   @dialog = conversation_handle
  FROM dbo.TestQueue
 ), TIMEOUT 500
 ;
 -- Received an message
 IF (@@ROWCOUNT != 0 AND @message_body IS NOT NULL)
 BEGIN
  IF @message_type = 'TestMessage'
  BEGIN
   -- **** PROCESSING HERE, e.g. Export To File ****
   DECLARE @fileNum int = @message_body.value('(/inserted/@DepartmentId)[1]', 'int');
   DECLARE @fileContent nvarchar(max) = CAST(@message_body AS nvarchar(max));
   DECLARE @filePath varchar(2000) = 'C:\TEMP\PETER\' + CAST(@fileNum AS varchar(99)) + '.xml';
   EXEC dbo.WriteToFile @filePath, @fileContent;
   -- Success Log
   INSERT INTO auditlog (xmlstring, isSuccess) values(@message_body, 1);
  END
  END CONVERSATION @dialog;
 END
 COMMIT;
END TRY
BEGIN CATCH
 DECLARE @error int, @message nvarchar(4000);
 SELECT @error = ERROR_NUMBER(), @message = ERROR_MESSAGE();
 -- Uncommittable tx.
 IF XACT_STATE() = -1
 BEGIN
  ROLLBACK;
 END
 -- Error but Commitable tx.
 IF XACT_STATE() = 1
 BEGIN
  COMMIT;
 END
 -- FAILED Audit
 INSERT INTO auditlog (xmlstring, isSuccess, err_num, err_msg)
  VALUES (@message_body, 0, @error, @message);
 -- End Conversion with Error
 END CONVERSATION @dialog WITH error = @error DESCRIPTION = @message;
END CATCH
GO
ALTER QUEUE [dbo].[TestQueue] WITH STATUS = ON , RETENTION = OFF , ACTIVATION (  STATUS = ON , PROCEDURE_NAME = [dbo].[spMessageProcTest] , MAX_QUEUE_READERS = 5 , EXECUTE AS OWNER  ), POISON_MESSAGE_HANDLING (STATUS = ON);
GO
-- TESTING --
INSERT DepartmentMaster (Name, Description) VALUES ('IT Dept', 'IT DEAPRTMENT');
SELECT TOP (1000) *, casted_message_body =
CASE message_type_name WHEN 'X'
  THEN CAST(message_body AS NVARCHAR(MAX))
  ELSE message_body
END
FROM [SBTEST2].[dbo].[TestQueue] WITH(NOLOCK)
;
SELECT xmlstring.value('(/inserted/@DepartmentId)[1]', 'int'), * FROM auditlog;
SELECT * FROM DepartmentMaster;

No comments:

Post a Comment