Have you ever wanted to stream data asynchronously out of SQL Server to an external application for some kind of background processing? Maybe you want to send email notifications. Maybe you want to feed data into something like redis, rabbitmq, or kafka. You can use a simple table as a queue, but you are always stuck with polling the table every so often. SQL Server actually has built in support for realtime asynchronous messaging through a less used feature called Service Broker.
This feature is under-used because it can be difficult to understand and complicated to set up. If you aren’t careful, it can also be leaky and eat up hidden storage on your server.
There is a built-in .Net feature called SqlDependency that attempts to be a wrapper around service broker to give you updates when a table changes. The problem is that this feature is known to be leaky and unreliable. You have to re-register the event handler after every update and it will miss any updates that arrive in between registrations.
There is another library called SqlDependencyEx that is a better wrapper around service broker. This library is actually really great. It also will call an event handler function when a table row is inserted, updated, or deleted. However, it creates resources in sql server automatically for you (which I’m not a fan of). It cleans up after itself pretty well, but this also means that it doesn’t queue up messages when the application isn’t running.
Below is a tutorial for setting up and sending messages through service broker manually yourself. You can find all of the source code on github.
First, set up a test database and enable service broker. It has to be enabled at the database level.
create database servicebrokertest go alter database servicebrokertest set enable_broker go use servicebrokertest go
Next, I’ll create some resources and helper functions inside of sql server to make it work like a queue.
create queue dbo.mcs_queue create service mcs_service on queue dbo.mcs_queue ([DEFAULT]) go -- push messages onto the fifo queue create procedure dbo.mcs_queue_push ( @message nvarchar(max) ) as begin declare @handle uniqueidentifier begin dialog @handle from service [mcs_service] to service 'mcs_service' on contract [DEFAULT] with encryption=off, lifetime=60; send on conversation @handle message type [DEFAULT] (@message) end conversation @handle end go -- pop messages off the queue create procedure dbo.mcs_queue_pop as begin declare @handle uniqueidentifier declare @message varbinary(max) waitfor( receive top(1) @handle = conversation_handle, @message = message_body from dbo.mcs_queue ), timeout 600000; -- 10 minutes begin try end conversation @handle; end try begin catch end catch select cast(@message as nvarchar(max)) as message end go -- view all messages in the queue create view dbo.mcs_queue_peek as select conversation_handle, message_enqueue_time, cast(message_body as nvarchar(max)) as message from dbo.mcs_queue where message_type_name = 'DEFAULT' go
You’ll notice that both sides need to call end conversation
. Otherwise, the system table sys.conversation_endpoints
will slowly fill up.
First push a test message onto the queue.
exec dbo.mcs_queue_push 'test message 1'
You can view messages in the queue.
select * from dbo.mcs_queue_peek
You can also pop a single message off of the queue.
exec mcs_queue_pop
This isn’t that interesting when you run it in sequence like this. The real power comes when you run mcs_queue_pop
in a separate window before pushing messages into the queue. If there aren’t any messages in the queue, it won’t return until a new message arrives. The waitfor
statement allows your application to be notified immediately when a message arrives. Instead of polling continuously for new messages, you can run a single query that doesn’t return until a message arrives. This pattern is known as long polling.
-- won't return exec mcs_queue_pop -- in a separate window exec dbo.mcs_queue_push 'test message 2'
You could write an application that ran mcs_queue_pop
inside a loop. This system would immediately get notified whenever mcs_queue_push
was called inside of the database. When the application was stopped, the system would queue up any unsent messages that would be picked up when the application started back up.
The following will uninstall all of the above resources.
-- delete all messages off of the queue create procedure dbo.mcs_queue_clear as begin declare @handles table (handle uniqueidentifier) insert into @handles select ce.conversation_handle from sys.services as s inner join sys.conversation_endpoints as ce on s.service_id = ce.service_id where s.name = 'mcs_service' declare @handle uniqueidentifier while exists(select * from @handles) begin select top 1 @handle = handle from @handles end conversation @handle with cleanup delete from @handles where handle = @handle end end go -- uninstall all resources for queues create procedure dbo.mcs_queue_uninstall as begin exec dbo.mcs_queue_clear drop view dbo.mcs_queue_peek drop procedure dbo.mcs_queue_clear drop procedure dbo.mcs_queue_pop drop procedure dbo.mcs_queue_push drop service mcs_service drop queue mcs_queue drop procedure dbo.mcs_queue_uninstall end go exec dbo.mcs_queue_uninstall go
Finally, you can tear down your test database.
use master go alter database servicebrokertest set single_user with rollback immediate drop database servicebrokertest go
I understand that is really just scratching the surface of what service broker can do. I also understand that this isn’t exactly the way service broker is intended to be used. However, it does present an interesting solution to the problem of streaming data out of sql server.
2019-02-12