Self Updating WCF Routing Services on Windows Azure Using CloudFx
Join the DZone community and get the full member experience.
Join For Free Several months ago, I started to develop a special project and I needed the ability to add multiple WCF services and make them available to through one single endpoint. Looking for a solution, I stumbled over WCF-Routing services. This kind of service allows you to bundle multiple WCF endpoints from multiple WCF services and expose them through a single endpoint (Service Aggregation). But that is not the only advantage you have.
By intercepting the messages that come in through the WCF-Routing service you could also check for valid user-tokens implement authentication and more. With all the required information in place, I wrote this little POC (Proof Of Concept) to demonstrate how it could be done. :) Here is the official MSDN definition of what a WCF-Routing service is:
The Routing Service is a generic SOAP intermediary that acts as a message router. The core functionality of the Routing Service is the ability to route messages based on message content , which allows a message to be forwarded to a client endpoint based on a value within the message itself, in either the header or the message body.
Another great advantage of the WCF-Routing services is that routes can be configured dynamically at runtime using RoutingExtensions based on the IExtension interface. This allows you to add endpoints of fellow WCF-Services by using Message filters. Here is the official MSDN definition:
The message filters used by the Routing Service provide common message selection functionality, such as evaluating the name of the endpoint that a message was sent to, the SOAP action, or the address or address prefix that the message was sent to. Filters can also be joined with an AND condition, so that messages will only be routed to an endpoint if the message matches both filters. You can also create custom filters by creating your own implementation of MessageFilter .Your can read more about the different types of Message-Filters here: Message Filters on MSDN In this implementation I use the EndPointAddressMessageFilter to expose the endpoint-address of the WCF- Service on the Routing-Service. Like mentioned before, WCF-Routing services can be configured dynamically to add WCF-Service endpoints at runtime. This is something I wanted to run fully automated and to scalable on Windows Azure. I am not going into the details on how WCF-Routing services work. You can read anything about WCF-Routing services on MSDN: Routing
The Basic Architecture

Worker Role Configuration (WCF-Routing service)
The “Endpoints” tab
We need a public endpoint for our routing service. On the property page for our worker role we can set an public input endpoint. This endpoint will accept simple TCP messages on the predefined port 10100.The “Settings” tab
To be able to address the service later via a domain name and not only using the IP-Address of the host, an additional entry “Domain” is added. Because the service is running only on the local machine, the value is set to “localhost”. You can change that later, if you want to run this POC on Windows Azure.The “ServcieDefinition.csdef” File
To allow the worker role to run with elevated privileges and to open custom ports for its endpoints, an “Runtime” element needs to be added within the “WorkerRole” tag:<Runtime executionContext="elevated" />
The POC implementation
Implementing the basic Routing-Service
using System; using System.Diagnostics; using System.Linq; using System.Net; using System.Threading; using Microsoft.WindowsAzure.ServiceRuntime; using System.ServiceModel; using System.ServiceModel.Routing; using SelfUpdatingServiceRouter.Behaviours; using System.ServiceModel.Description; namespace SelfUpdatingServiceRouter { public class WorkerRole : RoleEntryPoint { private string endPointAddress; private string listenAddress; private RoleInstanceEndpoint endpointAzure; /// <summary> /// Called by Windows Azure after the role instance has been initialized. This method /// serves as the /// main thread of execution for your role. /// </summary> /// <remarks> /// <para> /// Override the Run method to implement your own code to manage the role's execution. /// The Run method should implement /// a long-running thread that carries out operations for the role. The default implementation /// sleeps for an infinite /// period, blocking return indefinitely. /// </para> /// <para> /// The role recycles when the Run method returns. /// </para> /// <para> /// Any exception that occurs within the Run method is an unhandled exception. /// </para> /// </remarks> public override void Run() { //Create a new WCF host of type RoutingService using (ServiceHost host = new ServiceHost(typeof(RoutingService))) { //Configure the host this.ConfigureServiceHost(host); while (true) { Thread.Sleep(10000); Trace.TraceInformation("Routing Service Working...", "Information"); Trace.TraceInformation(listenAddress); } } } /// <summary> /// Configures the service host. /// </summary> /// <param name="host">The host.</param> private void ConfigureServiceHost(ServiceHost host) { try { endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["RoutingServiceMain"]; this.endPointAddress = string.Format("http://{0}/ServiceRouter", endpointAzure.IPEndpoint); this.listenAddress = string.Format("http://{0}:{1}/ServiceRouter/", RoleEnvironment.GetConfigurationSettingValue("Domain"), endpointAzure.IPEndpoint.Port); var httpBinding = new BasicHttpBinding(); httpBinding.SendTimeout = TimeSpan.FromMinutes(1); httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(1); var routerEndpoint = host.AddServiceEndpoint(typeof(IRequestReplyRouter), httpBinding, this.endPointAddress, new Uri(this.listenAddress)); routerEndpoint.Name = "RouterMain"; host.Description.Behaviors.Add(new RoutingBehavior(new RoutingConfiguration())); host.Description.Behaviors.Add(new RoutingUpdateBehaviour()); ServiceMetadataBehavior smb = new ServiceMetadataBehavior(); smb.HttpGetEnabled = true; smb.HttpGetUrl = new Uri(this.endPointAddress); host.Description.Behaviors.Add(smb); ServiceDebugBehavior debug = host.Description.Behaviors.Find<ServiceDebugBehavior>(); // if not found - add behavior with setting turned on if (debug == null) { host.Description.Behaviors.Add( new ServiceDebugBehavior() { IncludeExceptionDetailInFaults = true }); } else { // make sure setting is turned ON if (!debug.IncludeExceptionDetailInFaults) { debug.IncludeExceptionDetailInFaults = true; } } host.Open(); } catch (Exception ex) { Trace.TraceError(ex.Message); } } /// <summary> /// Called by Windows Azure to initialize the role instance. /// </summary> /// <remarks> /// <para> /// Override the OnStart method to run initialization code for your role. /// </para> /// <para> /// Before the OnStart method returns, the instance's status is set to Busy and the /// instance is not available /// for requests via the load balancer. /// </para> /// <para> /// If the OnStart method returns false, the instance is immediately stopped. If /// the method /// returns true, then Windows Azure starts the role by calling the <see cref="M:Microsoft.WindowsAzure.ServiceRuntime.RoleEntryPoint.Run" /> /// method. /// </para> /// <para> /// A web role can include initialization code in the ASP.NET Application_Start method /// instead of the OnStart method. /// Application_Start is called after the OnStart method. /// </para> /// <para> /// Any exception that occurs within the OnStart method is an unhandled exception. /// </para> /// </remarks> /// <returns> /// True if initialization succeeds, False if it fails. The default implementation /// returns True. /// </returns> public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // For information on handling configuration changes // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. return base.OnStart(); } } }
The Service Bus part - background
In the first version of this service, I have used the pure Service Bus .NET assemblies. A few days ago I discovered the CloudFx (Cloud Application Framework & Extensions) library on NuGet. This library was originally written by Microsoft folks to boost the development of cloud based projects. It is based on the “Reactive Extensions” (Rx) library. The CloudFx library allows you to use a publish/subscribe pattern on data-streams. Data streams can be events, Twitter feeds, web service requests and more using LINQ. Here is a small sample, that uses a MouseClick event with the publish/subscribe pattern (Source: MSDN):///Declare an observable public ISubject<MouseEventArgs> MouseMove; ///Publish data MouseMove.OnNext(args); ///Subscribe to an observable MouseMove.Subscribe(args => Display(args));
As you can see there is no usual Event/Delegate pattern visible anymore. An ISubject of type MouseEventArgs is created and the next time a mouse event occurs, the EventArgs are published to all subscribers. A few lines of code to create some awesomeness – love it! Here are some of the great features:
- Send and receive messages asynchronously using a Service Bus Topic
- Send and receive unicast and multicast messages with Service Bus
- Simple use of “Put” and “Get” (send and receive) methods (Very handy to upload and download blobs)
- Error handling
- Inter-role communication
In general, it greatly reduces the code you need to implement these scenarios. Please see the excellent CloudFx Samples on MSDN .
Basic DAL implementation
The RouteMeModel class
using System; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations.Schema; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Models { /// <summary> /// The model class holding /// the WCF properties to be /// sent to the routing service. /// </summary> [Table("RouteModel")] public class RouteMeModel { /// <summary> /// Gets or sets the serice id. /// </summary> /// <value>The serice id.</value> [Key] [Column("ServiceUID")] public string SericeUID { get; set; } /// <summary> /// Gets or sets the end point address. /// </summary> /// <value>The end point address.</value> [Column("EndPointAddress")] public string EndPointAddress { get; set; } /// <summary> /// Gets or sets the name of the service. /// </summary> /// <value>The name of the service.</value> [Column("ServiceName")] public string ServiceName { get; set; } /// <summary> /// Gets or sets the name of the contract. /// </summary> /// <value>The name of the contract.</value> [Column("ContractName")] public string ContractName { get; set; } /// <summary> /// Gets or sets the full name of the assembly. /// </summary> /// <value>The full name of the assembly.</value> [Column("FullAssemblyName")] public string FullAssemblyName { get; set; } } }The RoutMeModel class will transport all the required data over the Service Bus to the Routing-Service, which includes:
- The endpoint address
- The name of the service
- The full name of the contract (including the namespace) implemented by the service
- The file-name of the executing assembly hosting the service implementation
Prerequisites
For our local development we will use SQL Server Express 2012. If you don’t have SQL Server Express 2012 installed on your local machine, you can get it here: Microsoft SQL Server 2012 ExpressAdding a DbContext implementation and configure it using Fluent-Configuration.
Because we have a very simple model, the configuration of our DbContext :using System; using System.Collections.Generic; using System.Configuration; using System.Data.Entity; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Models { public class RouteContext:DbContext { /// <summary> /// Gets or sets the services. /// </summary> /// <value>The services.</value> public DbSet<RouteMeModel> Services { get; set; } //TODO:INSERT YOUR CONNECTION STRING HERE public RouteContext() : base(@"[YOUR SQL CONNECTION STRING HERE") { } /// <summary> /// This method is called when the model for a derived context has been initialized, /// but /// before the model has been locked down and used to initialize the context. The /// default /// implementation of this method does nothing, but it can be overridden in a derived /// class /// such that the model can be further configured before it is locked down. /// </summary> /// <param name="modelBuilder">The builder that defines the model for the context /// being created.</param> /// <remarks> /// Typically, this method is called only once when the first instance of a derived /// context /// is created. The model for that context is then cached and is for all further /// instances of /// the context in the app domain. This caching can be disabled by setting the ModelCaching /// property on the given ModelBuidler, but note that this can seriously degrade /// performance. /// More control over caching is provided through use of the DbModelBuilder and DbContextFactory /// classes directly. /// </remarks> protected override void OnModelCreating(DbModelBuilder modelBuilder) { //Autodetect changes this.Configuration.AutoDetectChangesEnabled = true; //Define the primary key modelBuilder.Entity<RouteMeModel>().HasKey<string>(r => r.SericeUID); //set the connstraints for the entity fields modelBuilder.Entity<RouteMeModel>().Property(r => r.ServiceName).HasMaxLength(120); //Max lenght for a identifier modelBuilder.Entity<RouteMeModel>().Property(r => r.ContractName).HasMaxLength(511); //Max lenght for a URL in a browser. Should fit modelBuilder.Entity<RouteMeModel>().Property(r => r.EndPointAddress).HasMaxLength(2083); //Max length for a assemlby name. Including 256 chars for MAX_PATH + assembly full name //assuming it has to fit using the standard GAC path modelBuilder.Entity<RouteMeModel>().Property(r => r.FullAssemblyName).HasMaxLength(356); //let it do the needed stuff //base.OnModelCreating(modelBuilder); } } }The next thing to do is to generate the data-table using the “Package Manager Console” and EF. First you need to enable migrations, then add a migration and the last thing to do is to update the database. Here are the three commands to type into the “Package Manager Console”:
- enable-migrations
- add-migration Initial (initial is the name for the migration)
- update-database
Dynamic WCF-Router configuration – base scenario
It must be possible to send WCF-Service endpoint-data to a Service Bus topic subscriber. The subscriber (in our case the WCF-Routing service) manages the received endpoint-data and transforms it to endpoints that need to be added dynamically to the routing table of the WCF-Routing service. This a prefect use-case for an unicast scenario. Messages are only addressed to one specific receiver, which is the WCF-Routing service.Installing CloudFX using NuGet
We need to install the latest pre-release to make it work with the latest Azure SDK (2.1 at the time of this writing).
Configuring CloudFx
CloudFx is configured using standard app.config files. To simplify the configuration process, I took the configuration file from the CloudFX samples solution, and adapted it accordingly to my needs. The most important setting is the part where the Service Bus settings take place:<ServiceBusConfiguration defaultEndpoint="wcfrouter" defaultNamespace="[YOUR NAMESPACE HERE]" defaultIssuerName="owner" defaultIssuerSecret="[YOUR KEY HERE] <add name="wcfrouter" endpointType="Topic" topicPath="WcfRouterSample" /> </ServiceBusConfiguration>
In short the values are:
- defaultEndPoint is the name for the CloudFx endpoint
- defaultNamespace is your Service Bus namespace name
- defaultIssuer is the issuer of your ACS token on Azure (standard owner)
- defaultIssuerSecret is the default key (ACS token) issued by “owner”
- endPointType is the type of the endpoint like topic, queue, relay and so forth
- topicPath this is where you set the name of the topic, that will be created if it does not exist already
The subscription name will be set to “To” property of the routing-message context. In this sample “ServiceRouter”. More about the routing-message context later.
WCF-Router Side Implementation
On the WCF-Router side the following things need to be done:- Check, if there are entries in the RouteModel table, if so, check if the services have been added, if not, add the new route to the table
- Listen for incoming requests, signaling that a WCF-Service wants to be added. Before that happens, check if the service is already in the database, if so, check if something has changed and update, if not just add it or reload it from the database
- Routing happens based on filtering endpoint addresses
- The filter-table and filters will be added dynamically per request
- The contracts will be dynamically added using contract descriptions
- A separate assembly that contains the service contract will be downloaded from blob-storage and used for the contract descriptions
- The dynamic update functionality will be implemented using a custom behavior and a custom extension implementation
All the requirements are packed into a custom IExtension<T> implementation that can be found in the System.ServiceModel namespace (System.ServiceModel.dll). The IExtension<T>-Interface allows to extend
- System.ServiceModel.IExtensibleObject<T>
- System.ServcieModel.IContextChannel
- System.ServiceModel.ServiceHost (That’s what happens here)
- System.ServiceModel.InstanceContext
- System.ServiceModel.OperationContext
Personally I see service-behavior like a kind of “bootstrapper” that allows you to add “plugins” (extensions) to a WCF-Service at runtime (this is what I use them for). A service-behavior exposes the service-host if you implement the IServiceBehaviour interface, so that you can add extensions to that specific host. The “Hook” we can use to add an extension to the current service host is by implementing the IServiceBehavior interface and to derive from the abstract class BehaviourExtensionElement (represents a WCF configuration element). Then we can use the ApplyDispatchBehaviour method (comes from IServiceBehavior) and add our extension to the service-host.
using System; using System.Collections.Generic; using System.Linq; using System.ServiceModel.Configuration; using System.ServiceModel.Description; using System.Text; using System.Threading.Tasks; using SelfUpdatingServiceRouter.Extensions; namespace SelfUpdatingServiceRouter.Behaviours { public class RoutingUpdateBehaviour : BehaviorExtensionElement, IServiceBehavior { /// <summary> /// Gets the type of behavior. /// </summary> /// <returns>The type of behavior.</returns> /// <value></value> public override Type BehaviorType { get { return typeof(RoutingUpdateBehaviour); } } /// <summary> /// Creates a behavior extension based on the current configuration settings. /// </summary> /// <returns>The behavior extension.</returns> protected override object CreateBehavior() { return new RoutingUpdateBehaviour(); } /// <summary> /// Adds the binding parameters. /// </summary> /// <param name="serviceDescription">The service description.</param> /// <param name="serviceHostBase">The service host base.</param> /// <param name="endpoints">The endpoints.</param> /// <param name="bindingParameters">The binding parameters.</param> public void AddBindingParameters(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase, System.Collections.ObjectModel.Collection<ServiceEndpoint> endpoints, System.ServiceModel.Channels.BindingParameterCollection bindingParameters) { //Not implemented } /// <summary> /// Provides the ability to change run-time property values or insert custom /// extension objects such as error handlers, message or parameter interceptors, /// security extensions, and other custom extension objects. /// </summary> /// <param name="serviceDescription">The service description.</param> /// <param name="serviceHostBase">The host that is currently being built.</param> public void ApplyDispatchBehavior(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase) { RouterUpdateExtension updateExtension = new RouterUpdateExtension(); serviceHostBase.Extensions.Add(updateExtension); } /// <summary> /// Provides the ability to inspect the service host and the service description /// to confirm that the service can run successfully. /// </summary> /// <param name="serviceDescription">The service description.</param> /// <param name="serviceHostBase">The service host that is currently being constructed.</param> public void Validate(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase) { //Not implemented } } }The extension for our service-host (which in this case is the WCF-Routing service) is realized by implementing the IExtension<ServiceHostBase> interface and the IDisposable interface. This is the most powerful piece of code in the whole solution. Maybe this is the most comprehensive example on how to configure a WCF-Routing service dynamically using code only.
using Microsoft.Experience.CloudFx.Framework.Configuration; using Microsoft.Experience.CloudFx.Framework.Messaging; using Microsoft.Experience.CloudFx.Framework.Storage; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.ServiceRuntime; using Models; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reactive; using System.Reflection; using System.ServiceModel; using System.ServiceModel.Description; using System.ServiceModel.Dispatcher; using System.ServiceModel.Routing; using System.Text; using System.Threading.Tasks; namespace SelfUpdatingServiceRouter.Extensions { class RouterUpdateExtension : IExtension<ServiceHostBase>, IDisposable { private RoleInstanceEndpoint endpointAzure; ServiceHostBase owner; IObserver<RouteMeModel> modelObserver; ServiceBusPublishSubscribeChannel pubSubChannel; List<ServiceEndpoint> serviceEndPoints; //The routing configuration to use RoutingConfiguration rc; /// <summary> /// Attaches the specified owner. /// </summary> /// <param name="owner">The owner.</param> public void Attach(ServiceHostBase owner) { this.owner = owner; //This could be done using the owner, you can read it near the bottom of this code-file endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["RoutingServiceMain"]; //start the fun this.Init(); } /// <summary> /// Detaches the specified owner. /// </summary> /// <param name="owner">The owner.</param> public void Detach(ServiceHostBase owner) { this.Dispose(); } public void Dispose() { //Dispose all the stuff } /// <summary> /// Inits the Service Bus stuff /// </summary> private void Init() { this.serviceEndPoints = new List<ServiceEndpoint>(); this.rc = new RoutingConfiguration(); this.AddRoutingEntries(); this.SetupServiceBus(); } /// <summary> /// Adds the routing entries. /// </summary> private void AddRoutingEntries() { using (var ctx = new RouteContext()) { if (ctx.Services.Count() > 0) { foreach (var entry in ctx.Services) { //Add a route for the service AddServiceBusEntry(entry); } } } } /// <summary> /// Setups the service bus. /// </summary> private void SetupServiceBus() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint)); CreateSubscriptionForService(pubSubChannel, "RoutingService"); } /// <summary> /// Creates the subscription for service. /// </summary> /// <param name="pubSubChannel">The pub sub channel.</param> /// <param name="p">The p.</param> private void CreateSubscriptionForService(ServiceBusPublishSubscribeChannel pubSubChannel, string serviceName) { //Define a filter to receive only messages that have a specific "To" property var filter = FilterExpressions.GroupOr( FilterExpressions.MatchTo(serviceName), FilterExpressions.MatchTo("ServiceRouter")); //Now we need to create an observer, that will check for new incoming messages modelObserver = Observer.Create<RouteMeModel>(msg => { var exists = CheckIfRoutingEntryExists(msg); if (!exists) { AddNewServiceEntry(msg); } }); pubSubChannel.Subscribe(serviceName,modelObserver,filter); } /// <summary> /// Adds the new service entry. /// </summary> /// <param name="msg">The MSG.</param> private void AddNewServiceEntry(RouteMeModel msg) { using (var ctx = new RouteContext()) { ctx.ChangeTracker.DetectChanges(); msg.SericeUID = Guid.NewGuid().ToString(); ctx.Services.Add(msg); ctx.SaveChanges(); //Add a route for the service AddServiceBusEntry(msg); } } /// <summary> /// Checks if routing entry exists. /// </summary> /// <param name="msg">The MSG.</param> /// <returns></returns> private bool CheckIfRoutingEntryExists(RouteMeModel msg) { using (var ctx = new RouteContext()) { var entry = (from service in ctx.Services where service.ServiceName.Equals(msg.ServiceName) && msg.ContractName.Equals(msg.ContractName) select service).FirstOrDefault(); if(entry == null) { return false; } else { return true; } } } /// <summary> /// Adds the service bus entry. /// </summary> /// <param name="message">The message.</param> private void AddServiceBusEntry(RouteMeModel message) { //Load the contract assembly from blob storage //Get the current type of contract to add to the client endpoint var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString"); var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection)); var contractAssemblyContainer = CloudConfigurationManager.GetSetting("AssemblyContainerName"); var contractAssemblyName = CloudConfigurationManager.GetSetting("ContractAssemblyName"); byte[] data = null; using (MemoryStream mstream = new MemoryStream()) { var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream); if (gotIt) { //Get the byte content. This methods does not care about the position data = mstream.ToArray(); } } //Now we load the contract assembly var assembly = Assembly.Load(data); Type contractType = assembly.GetType(message.ContractName); //The contract description we use var conDesc = ContractDescription.GetContract(contractType); var HTTPbinding = new BasicHttpBinding(); var currentServiceEndPoint = new ServiceEndpoint( conDesc, HTTPbinding, new EndpointAddress(message.EndPointAddress)); currentServiceEndPoint.Name = message.ServiceName; var routerMainEndpoint = owner.Description.Endpoints.Where(ep => ep.Name == "RouterMain").FirstOrDefault(); var conDescRouter = ContractDescription.GetContract(typeof(IRequestReplyRouter)); var rEndPoint = new ServiceEndpoint(conDescRouter,new BasicHttpBinding(), new EndpointAddress( routerMainEndpoint.Address.Uri.OriginalString +"/" + message.ServiceName)); rEndPoint.Name = message.ServiceName; this.owner.AddServiceEndpoint(rEndPoint); var addressFilter = new EndpointAddressMessageFilter(new EndpointAddress(routerMainEndpoint.Address.Uri.OriginalString + "/" + message.ServiceName)); //We don't want to route on headers only rc.RouteOnHeadersOnly = false; //Add the filter table rc.FilterTable.Add(addressFilter, new List<ServiceEndpoint>() { currentServiceEndPoint }); //Apply the dynamic configuration this.owner.Extensions.Find<RoutingExtension>().ApplyConfiguration(rc); } /// <summary> /// Checks if end point was added. /// </summary> /// <param name="host">The host.</param> /// <param name="endpointAddress">The endpoint address.</param> /// <returns></returns> private static bool CheckIfEndPointWasAdded(ServiceHostBase host, string endpointAddress) { bool isPresent = false; foreach (var endpoint in host.Description.Endpoints.ToList()) { if (endpoint.ListenUri.AbsoluteUri.Equals(endpointAddress)) { isPresent = true; break; } } return isPresent; } } }The main players in this implementation are the following methods:
- AddRoutingEntries and AddServiceBusEntry, both responsible for adding endpoint-entries to the database
- SetupServiceBus and CreateSubscriptionForService, both responsible for setting up a CloudFx driven Service Bus Topic subscription
- AddServiceBusEntry (should be renamed), responsible for the dynamic WCF-Router configuration
The SetupServiceBus-Method
This method is loading the CloudFx Service Bus configuration using the CloudFx specific CloudApplicationConfiguration class. After loading the configuration, it sets up a ServicePublishSubscribeChannel using the default configuration endpoint to load the Service Bus configuration from the app.config file. The ServicePublishSubscribeChannel can be used to publish messages for a specific Service Bus topic, or to subscribe for messages on a specific Service Bus topic. In this case we do a subscription and wait for topic-messages to arrive./// <summary> /// Setups the service bus. /// </summary> private void SetupServiceBus() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint)); CreateSubscriptionForService(pubSubChannel, "RoutingService"); }
The CreateSubscriptionForService-Method
That’s where the real magic of CloudFx happens. First we set-up a filter-expression for the incoming messages. We receive all messages that contain either the name of the service OR the string constant “ServiceRouter”. All other messages are not of any interest. Then we create an Observer<RouteModel>. The observer will check the Service Bus topic for appropriate messages using the filter-criteria's we defined before. The last thing we do is to subscribe to our pub/sub channel using the service-name (channel name) our observer and the filter. From now on the message-loop is running, ready to receive messages asynchronously. That’s pretty cool!/// <summary> /// Creates the subscription for service. /// </summary> /// <param name="pubSubChannel">The pub sub channel.</param> /// <param name="p">The p.</param> private void CreateSubscriptionForService(ServiceBusPublishSubscribeChannel pubSubChannel, string serviceName) { //Define a filter to receive only messages that have a specific "To" property var filter = FilterExpressions.GroupOr( FilterExpressions.MatchTo(serviceName), FilterExpressions.MatchTo("ServiceRouter")); //Now we need to create an observer, that will check for new incoming messages modelObserver = Observer.Create<RouteMeModel>(msg => { var exists = CheckIfRoutingEntryExists(msg); if (!exists) { AddNewServiceEntry(msg); } }); pubSubChannel.Subscribe(serviceName,modelObserver,filter); }
The AddServiceBusEntry-Method
This methods implements some real cool things:- Loading the contract assembly for the services to be added to the WCF-Router from Azure Blob storage using CloudFx!
- It adds the endpoint addresses of the WCF-Services to route to as well as the filters to the WCF-Routing service and re-configures the WCF-Routing service at runtime!
var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString");This line creates a new ReliableCloudBlobStorage instance:
var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection));and this line downloads the contract-assembly blob, using the ReliableCloudBlobStorage instance:
var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream);It returns true, if the download was successful, otherwise false. That’s all the code required to download a blob. To upload a blob, you use the Put-Method :)
/// <summary> /// Adds the service bus entry. /// </summary> /// <param name="message">The message.</param> private void AddServiceBusEntry(RouteMeModel message) { //Load the contract assembly from blob storage //Get the current type of contract to add to the client endpoint var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString"); var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection)); var contractAssemblyContainer = CloudConfigurationManager.GetSetting("AssemblyContainerName"); var contractAssemblyName = CloudConfigurationManager.GetSetting("ContractAssemblyName"); byte[] data = null; using (MemoryStream mstream = new MemoryStream()) { var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream); if (gotIt) { //Get the byte content. This methods does not care about the position data = mstream.ToArray(); } } //Now we load the contract assembly var assembly = Assembly.Load(data); Type contractType = assembly.GetType(message.ContractName); //The contract description we use var conDesc = ContractDescription.GetContract(contractType); var HTTPbinding = new BasicHttpBinding(); var currentServiceEndPoint = new ServiceEndpoint( conDesc, HTTPbinding, new EndpointAddress(message.EndPointAddress)); currentServiceEndPoint.Name = message.ServiceName; var routerMainEndpoint = owner.Description.Endpoints.Where(ep => ep.Name == "RouterMain").FirstOrDefault(); var conDescRouter = ContractDescription.GetContract(typeof(IRequestReplyRouter)); var rEndPoint = new ServiceEndpoint(conDescRouter,new BasicHttpBinding(), new EndpointAddress( routerMainEndpoint.Address.Uri.OriginalString +"/" + message.ServiceName)); rEndPoint.Name = message.ServiceName; this.owner.AddServiceEndpoint(rEndPoint); var addressFilter = new EndpointAddressMessageFilter(new EndpointAddress(routerMainEndpoint.Address.Uri.OriginalString + "/" + message.ServiceName)); //We don't want to route on headers only rc.RouteOnHeadersOnly = false; //Add the filter table rc.FilterTable.Add(addressFilter, new List<ServiceEndpoint>() { currentServiceEndPoint }); //Apply the dynamic configuration this.owner.Extensions.Find<RoutingExtension>().ApplyConfiguration(rc); }
The Publisher Part
The publishing part is implemented in a separate assembly called “ServiceMessenger”. It contains only one class, the Messenger class. This assembly needs to be referenced by any of the WCF-Services that want to add their endpoints to the WCF-Router.using Microsoft.Experience.CloudFx.Framework.Configuration; using Microsoft.Experience.CloudFx.Framework.Messaging; using Models; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ServiceMessenger { public class Messenger { private RouteMeModel model; /// <summary> /// Initializes a new instance of the <see cref="Messenger" /> class. /// </summary> /// <param name="servcieModel">The servcie model.</param> public Messenger(string endPointAddress, string contracName, string assemblyName, string serviceName) { this.model = new RouteMeModel() { ContractName = contracName, EndPointAddress = endPointAddress, FullAssemblyName = assemblyName, ServiceName = serviceName }; } /// <summary> /// Setups the service bus. /// </summary> public void SendMessageToRouter() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); var routerServiceCtx = new RoutingMessageContext { To = "ServiceRouter" }; using (var pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint))) { pubSubChannel.Settings.MessageTimeToLive = TimeSpan.FromSeconds(120); //publish the message pubSubChannel.Publish(this.model, routerServiceCtx); } } } }Creating a new instance allows you to pass all the important parameters to create a new routing-entry and send it via the Service Bus to the WCF-Routing service, where the RouterUpdate extension will receive the data and add a new service endpoint, if it not already exists. The initialization process to create a new ServcieBusPublishSubscriber channel is identical with the one we used to subscribe to the Service Bus topic.
/// <summary> /// Setups the service bus. /// </summary> public void SendMessageToRouter() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); var routerServiceCtx = new RoutingMessageContext { To = "ServiceRouter" }; using (var pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint))) { pubSubChannel.Settings.MessageTimeToLive = TimeSpan.FromSeconds(120); //publish the message pubSubChannel.Publish(this.model, routerServiceCtx); } }There are two differences:
- A RoutingMessageContext instance is created, and the receiver is set via the “To” property
- The Publish-Method of the ServiceBusPublishSubscriberChannel is used to send an instance of the RouteModel class over the wire, which contains the endpoint data of the service that wants to be added to the WCF-Routing service
The Sample WCF-Services and the WCF-Test Client
There are two WCF-Services that publish their endpoints using the ServiceMessenger assembly to the WCF-Router:- HelloWorld
- and HelloWorldExtended
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Threading; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.Diagnostics; using Microsoft.WindowsAzure.ServiceRuntime; using Microsoft.WindowsAzure.Storage; using System.ServiceModel; using ContractAssembly; using ServiceMessenger; using System.ServiceModel.Description; namespace HelloWorld { public class WorkerRole : RoleEntryPoint { private string endPointAddress; private string listenAddress; private RoleInstanceEndpoint endpointAzure; public override void Run() { using (ServiceHost host = new ServiceHost(typeof(HelloWorld))) { Thread.Sleep(60000); // This is a sample worker implementation. Replace with your logic. Trace.TraceInformation("HelloWorldExtended entry point called", "Information"); this.ConfigureServiceHost(host); while (true) { Thread.Sleep(1000); Trace.TraceInformation(endPointAddress); } } } /// <summary> /// Configures the service host. /// </summary> /// <param name="host">The host.</param> private void ConfigureServiceHost(ServiceHost host) { try { endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["HelloWorldMain"]; this.endPointAddress = string.Format("http://{0}/HelloWorld", endpointAzure.IPEndpoint); this.listenAddress = string.Format("http://{0}:{1}/HelloWorld/", RoleEnvironment.GetConfigurationSettingValue("Domain"), endpointAzure.IPEndpoint.Port); var httpBinding = new BasicHttpBinding(); httpBinding.SendTimeout = TimeSpan.FromMinutes(1); httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(1); var helloWorldExtendedEndPoint = host.AddServiceEndpoint(typeof(IHelloWorldService), httpBinding, this.endPointAddress, new Uri(this.listenAddress)); helloWorldExtendedEndPoint.Name = "HelloWorld"; var messenger = new Messenger(this.endPointAddress, "ContractAssembly.IHelloWorldService", "ContractAssembly.dll", "HelloWorld"); messenger.SendMessageToRouter(); ServiceMetadataBehavior smb = new ServiceMetadataBehavior(); smb.HttpGetEnabled = true; smb.HttpGetUrl = new Uri(this.endPointAddress); host.Description.Behaviors.Add(smb); host.Open(); } catch (Exception ex) { Trace.TraceError(ex.Message); } } public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // For information on handling configuration changes // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. return base.OnStart(); } } }To publish the single endpoint of each service we use two lines of code:
var messenger = new Messenger(this.endPointAddress, "ContractAssembly.IHelloWorldService", "ContractAssembly.dll", "HelloWorld"); messenger.SendMessageToRouter();We create a new Messenger instance and publish the new service endpoint using the SendMessageToRouter method. Sweet! For a quick overview of the solution, please see this code-map:

Opinions expressed by DZone contributors are their own.
Comments