"Pushing" Notifications on Azure Service Bus
Join the DZone community and get the full member experience.
Join For FreeSo the last several projects I’ve worked on, I’ve wanted to have a push notification system that I could use to send messages to role instances so that they could take actions. There’s several push notification systems out there, but I was after some simple that would be included as part of my Windows Azure services. I’ve put a version of this concept into several proposals, but this week finally received time to create a practical demo of the idea.
For this demo, I’ve selected to use Windows Azure Service Bus Topics. Topics, unlike Windows Azure Storage queues give me the capability to have multiple subscribers each receive a copy of a message. This was also an opportunity to dig into a feature of Windows Azure I haven’t worked with in over a year. Given how much the API has changed in that time, it was a frustrating, yet rewarding exercise.
The concept is fairly simple. Messages are sent to a centralized topic for distribution. Each role instance then creates its own subscriber with the appropriate filter on it so it receives the messages it cares about. This solution allows for multiple publishers and subscribers and will give me a decent amount of scale. I’ve heard reports/rumors of issues when you get beyond several hundred subscribers, but for this demo, we’ll be just fine.
Now for this demo implementation, I want to keep it simple. It should be a central class that can be used by workers or web roles to create their subscriptions and receive notifications with very little effort. And to keep this simplicity going, give me just as easy a way to send messages back out.
NotificationAgent
We’ll start by creating a class library for our centralized class, adding references to it for Microsoft.ServiceBus (so we can do our brokered messaging) and Microsoft.WindowsAzure.ServiceRuntime (for access to the role environment). I’m also going to create my NotificationTopic class.
Note: there are several supporting classes in the solution that I won’t cover in this article. If you want the full code for this solution, you can download it here.
The first method we’ll add to this is a constructor that takes the parameters we’ll need to connect to our service bus namespace as well as the name/path for the topic we’ll be using to broadcast notifications on. The first of these is creating a namespace manager so I can create topics and subscriptions and a messaging factory that I’ll use to receive messages. I’ve split this out a bit so that my class can support being passed a TokenProvider (I hate demo’s that only use the service owner). But here is the important lines:
TokenProvider tmpToken = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey); Uri namespaceAddress = ServiceBusEnvironment.CreateServiceUri(“sb”, baseAddress, string.Empty); this.namespaceManager = new NamespaceManager(namespaceAddress, tokenProvider); this.messagingFactory = MessagingFactory.Create(namespaceAddress, tokenProvider);
We create a URI and a security token to use for interaction with our service bus namespace. For the sake of simplicity I’m using issuer name (owner) an the service administration key. I’d never recommend this for a production solution, but its fine for demonstration purposes. We use these to create a NamespaceManager and MessagingFactory.
Now we need to create the topic, if it doesn’t already exist.
try { // doesn’t always work, so wrap it if (!namespaceManager.TopicExists(topicName)) this.namespaceManager.CreateTopic(topicName); } catch (MessagingEntityAlreadyExistsException) { // ignore, timing issues could cause this }
Notice that I check to see if the topic exists, but I also trap for the exception. That’s because I don’t want to assume the operation is single threaded. With this block of code running in many role instances, its possible that between checking if it doesn’t exist and the create. So I like to wrap them in a try/catch. You can also just catch the exception, but I’ve long liked to avoid the overhead of unnecessary exceptions.
Finally, I’ll create a TopicClient that I’ll use to send messages to the topic.
So by creating an instance of this class, I can properly assume that the topic exists, and I have all the items I need to send or receive messages.
Sending Messages
Next up, I create a SendMessage method that accepts a string message payload, the type of message, and a TImeSpan value that indicates how long the message should live. In this method we first create a BrokeredMessage giving it an object that represents my notification message. We use the lifespan value that is passed in and set the type as a property. Finally, we send the message using the TopicClient we created earlier and do appropriate exception handling and cleanup.
try { bm = new BrokeredMessage(msg); bm.TimeToLive = msgLifespan; // used for filtering bm.Properties[MESSAGEPROPERTY_TYPE] = messageType.ToString(); topicClient.Send(bm); success = true; } catch (Exception) { success = false; // TODO: do something } finally { if (bm != null) // if was created successfully bm.Dispose(); }
Now the important piece here is the setting of a BrokeredMessage property. It’s this property that can be used later on to filter the messages we want to receive. So let’s not forget that. And you’ll also notice I have a TODO left to add some intelligent exception handling. Like logging the issue.
Start Receiving
This is when things get a little more complicated. Now the experts (meaning the folks I know/trust that responded to my inquiry), recommend that instead of going “old school” and having a thread that’s continually polling for responses, we instead leverage async processing. So we’re going to make use of delegates.
First we need to define a delegate for the callback method:
public delegate bool RecieverCallback(NotificationMessage mesage, NotificationMessageType type);
We then reference the new delegate in the method signature for the message receiving starter:
public void StartReceiving(RecieverCallback callback, NotificationMessageType msgType = NotificationMessageType.All)
Now inside this method we first need to create our subscriber. Since I want to have one subscriber for each role instance, I’ll need to get this from the Role Environment.
// need to parse out deployment ID string instanceId = Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance.Id; subscriptionName = instanceId.Substring(instanceId.IndexOf(‘.’)+1);SubscriptionDescription tmpSub = new SubscriptionDescription(topicName, subscriptionName);
Now is the point where we’ll add the in a filter using the Property that we set on the notification when we created it.
{ Filter tmpFilter = new SqlFilter(string.Format(“{0} = ‘{1}’”, MESSAGEPROPERTY_TYPE, msgType)); subscriptionClient.AddRule(SUBFILTER, tmpFilter); }
I’m keeping it simple and using a SqlFilter using the property name we assigned when sending. So this subscription will only receive messages that match our filter criteria.
Now that all the setup is done, we’ll delete the subscription if it already exists (this gets rid of any messages and allows us to start clean) and create it new using the NameSpaceManager we instantiated in the class constructor. Then we start our async operation to retrieve messages:
asyncresult = subscriptionClient.BeginReceive(waittime, ReceiveDone, subscriptionClient);
Now in this, ReceiveDone is the callback method for the operation. This method is pretty straight forward. We make sure we’ve gotten a message (in case the operation simply timed out) and that we can get the payload. Then, using the delegate we set up earlier, And then we end by starting another async call to get another message.
if (result != null) { SubscriptionClient tmpClient = result.AsyncState as SubscriptionClient; BrokeredMessage brokeredMessage = tmpClient.EndReceive(result); //brokeredMessage.Complete(); // not really needed because your receive mode is ReceiveAndDelete if (brokeredMessage != null) { NotificationMessage tmpMessage = brokeredMessage.GetBody<NotificationMessage>(); // do some type mapping here recieverCallback(tmpMessage, tmpType); } } // do recieve for next message asyncresult = subscriptionClient.BeginReceive(ReceiveDone, subscriptionClient);
Now I’ve added two null checks in this method just to help out in case a receive operation fails. Even the, I won’t guarantee this works for all situations. In my tests, when I set the lifespan of a message to less than 5 seconds, still had some issues (sorting those out yet, but wanted to get this sample out).
Client side implementation
Whew! Lots of setup there. This is where our hard work pays off. We define a callback method we’re going to hand into our notification helper class using the delegate we defined. We’ll keep it super simple:
private bool NotificationRecieved(NotificationMessage message, NotificationMessageType type) { Console.WriteLine(“Recieved Notification”); return true; }
Now we need to instantiate our helper class and start the process of receiving messages. We can do this with a private variable to hold on our object and a couple lines into role’s OnStart.
tmpNotifier = new NotificationTopic(ServiceNamespace, IssuerName, IssuerKey, TopicName); tmpNotifier.StartReceiving(new NotificationTopic.RecieverCallback(NotificationRecieved), NotificationMessageType.All);
Now if we want to clean things up, we can also add some code to the role’s OnStop.
try { if (tmpNotifier != null) tmpNotifier.StopReceiving(); } catch (Exception e) { Console.WriteLine(“Exception during OnStop: “ + e.ToString()); }base.OnStop();
And that’s all we need.
In Closing
So that’s it for our basic implementation. I’ve uploaded the demo for you to use at your own risk. You’ll need to update the WebRole, WorkerRole, and NotifierSample project with the information about your Service Bus namespace. To run the demo, you will want to set the cloud service project as the startup project, and launch it. Then right click on the NotifierSample project and start debugging on it as well.
While this demo may work fine for certain applications, there is definitely room for enhancement. We can tweak our message lifespan, wait timeouts, and even how many messages we retrieve at one time. And it’s also not the only way to accomplish this. But I think it’s a solid starting point if you need this kind of simple, self-contained notification service.
PS – As configured, this solution will require the ability to send outbound traffic on port 9354.
Published at DZone with permission of Brent Stineman, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Tomorrow’s Cloud Today: Unpacking the Future of Cloud Computing
-
Design Patterns for Microservices: Ambassador, Anti-Corruption Layer, and Backends for Frontends
-
Managing Data Residency, the Demo
-
Micro Frontends on Monorepo With Remote State Management
Comments