home

Real-time Data Streaming from SQL Server

Have you ever wanted to stream data asynchronously out of SQL Server to an external application for some kind of background processing? Maybe you want to send email notifications. Maybe you want to feed data into something like redis, rabbitmq, or kafka. You can use a simple table as a queue, but you are always stuck with polling the table every so often. SQL Server actually has built in support for realtime asynchronous messaging through a less used feature called Service Broker.

This feature is under-used because it can be difficult to understand and complicated to set up. If you aren’t careful, it can also be leaky and eat up hidden storage on your server.

There is a built-in .Net feature called SqlDependency that attempts to be a wrapper around service broker to give you updates when a table changes. The problem is that this feature is known to be leaky and unreliable. You have to re-register the event handler after every update and it will miss any updates that arrive in between registrations.

There is another library called SqlDependencyEx that is a better wrapper around service broker. This library is actually really great. It also will call an event handler function when a table row is inserted, updated, or deleted. However, it creates resources in sql server automatically for you (which I’m not a fan of). It cleans up after itself pretty well, but this also means that it doesn’t queue up messages when the application isn’t running.

Below is a tutorial for setting up and sending messages through service broker manually yourself. You can find all of the source code on github.

Setup

First, set up a test database and enable service broker. It has to be enabled at the database level.

create database servicebrokertest
go
alter database servicebrokertest set enable_broker
go
use servicebrokertest
go

Install

Next, I’ll create some resources and helper functions inside of sql server to make it work like a queue.

create queue dbo.mcs_queue
create service mcs_service on queue dbo.mcs_queue ([DEFAULT])
go

-- push messages onto the fifo queue
create procedure dbo.mcs_queue_push (
	@message nvarchar(max)
) as
begin
	declare @handle uniqueidentifier
	begin dialog @handle from service [mcs_service] to service 'mcs_service' 
    	     on contract [DEFAULT] with encryption=off, lifetime=60;
	send on conversation @handle message type [DEFAULT] (@message)
	end conversation @handle
end
go

-- pop messages off the queue
create procedure dbo.mcs_queue_pop as
begin
	declare @handle uniqueidentifier
	declare @message varbinary(max)
	waitfor(
		receive top(1) 
			@handle = conversation_handle, 
			@message = message_body 
		from dbo.mcs_queue
	), timeout 600000; -- 10 minutes
	
	begin try
		end conversation @handle;
	end try
	begin catch
	end catch
	
	select cast(@message as nvarchar(max)) as message
end
go

-- view all messages in the queue
create view dbo.mcs_queue_peek as
	select 
    	     conversation_handle, 
    	     message_enqueue_time, 
    	     cast(message_body as nvarchar(max)) as message
	from dbo.mcs_queue
	where message_type_name = 'DEFAULT'
go

You’ll notice that both sides need to call end conversation. Otherwise, the system table sys.conversation_endpoints will slowly fill up.

Test

First push a test message onto the queue.

exec dbo.mcs_queue_push 'test message 1'

You can view messages in the queue.

select * from dbo.mcs_queue_peek 

You can also pop a single message off of the queue.

exec mcs_queue_pop

This isn’t that interesting when you run it in sequence like this. The real power comes when you run mcs_queue_pop in a separate window before pushing messages into the queue. If there aren’t any messages in the queue, it won’t return until a new message arrives. The waitfor statement allows your application to be notified immediately when a message arrives. Instead of polling continuously for new messages, you can run a single query that doesn’t return until a message arrives. This pattern is known as long polling.

-- won't return
exec mcs_queue_pop

-- in a separate window
exec dbo.mcs_queue_push 'test message 2'

You could write an application that ran mcs_queue_pop inside a loop. This system would immediately get notified whenever mcs_queue_push was called inside of the database. When the application was stopped, the system would queue up any unsent messages that would be picked up when the application started back up.

Uninstall

The following will uninstall all of the above resources.

-- delete all messages off of the queue
create procedure dbo.mcs_queue_clear as
begin
	declare @handles table (handle uniqueidentifier)
	
	insert into @handles
	select ce.conversation_handle
	from sys.services as s
	inner join sys.conversation_endpoints as ce on s.service_id = ce.service_id
	where s.name = 'mcs_service' 
	
	declare @handle uniqueidentifier
	
	while exists(select * from @handles)
	begin
		select top 1 @handle = handle from @handles	
	
		end conversation @handle with cleanup
	
		delete from @handles where handle = @handle
	end
end
go

-- uninstall all resources for queues
create procedure dbo.mcs_queue_uninstall as
begin
	exec dbo.mcs_queue_clear
	drop view dbo.mcs_queue_peek
	drop procedure dbo.mcs_queue_clear
	drop procedure dbo.mcs_queue_pop
	drop procedure dbo.mcs_queue_push
	drop service mcs_service
	drop queue mcs_queue
	drop procedure dbo.mcs_queue_uninstall
end
go

exec dbo.mcs_queue_uninstall
go

Teardown

Finally, you can tear down your test database.

use master
go
alter database servicebrokertest set single_user with rollback immediate
drop database servicebrokertest
go

Conclusion

I understand that is really just scratching the surface of what service broker can do. I also understand that this isn’t exactly the way service broker is intended to be used. However, it does present an interesting solution to the problem of streaming data out of sql server.

2019-02-12