Servicebus and WCF : using queues

This is the first in what will be a series of articles about using WCF and the service bus for windows server (or ASB as well).

I can cover the setup and configuration of the SBWS, but for now I consider it out of scope. Unless the need arises; I will not explain the basic functionalities of the SBWS. Seeing as working with the SB and WCF is a somewhat more opinionated subject, I assume you have some knowledge on the subject of queue’s, topics and the problems with queueing in general. So this article series requires you to have a basic idea of what the service bus for windows server is and what it does. If not, I advise you to look at Clemens Vasters excellent introductory video at Channel 9. (Thumbs up for mister Vasters, the community appreciates the effort you put into this subject!!)

How I think about WCF & the SBWS is as follows: before we dive into messaging using WCF and the Service bus, it is important to understand that we’re essentially combining two technologies that solve some of the same problems.

The SB is expanding the connectivity that WCF offers, but offers nothing for the hosting & activation model. That is why this combination is important.

The first thing you need to know is that WCF’s addressing is not creating a queue in the SB. You’re probably used to giving the endpoint address along with the ServiceHost and then the ServiceHost takes care of registering the port and address for you. In WCF, whenever a queueing system is involved (MSMQ or SB, or other) you’ll need to manage your own queue’s (or topics or …) unless that functionality is specifically provided in the binding. With NestmessagingBinding it is not provided.

To create a queue (or a topic or just about anything else in the SB) you’ll need to use the SB’s own API’s.

The example I got this information from can be found here:

https://github.com/Azure-Samples/azure-servicebus-messaging-samples/tree/master/NetMessagingBinding

Although the similarities in working with MSMQ & WCF is quite clear, to my knowledge, this is not explicitly documented anywhere else.

Two last disclaimers:

  • I will do everything in code configuration. There are some samples on the web who do config file WCF configuration. I personally don’t like that way as the default way of configuring my services.
  • by no means do I consider this production worthy code. My aim is to get you a good understanding of the connectivity model and get you up and running fast. There are so many customizations possible with WCF and .net in general that it is hard not to bias you when delivering production ready code; that is why I choose to deliver demo code.

Preparing the setup:

string ServerFQDN = System.Net.Dns.GetHostEntry(string.Empty).HostName;
int tcpPort = 11354;
int HttpPort = 11355;
string ServiceNamespace = "dbengineering";

I am using non-default values for the service bus. The 9354 port was used in my system by RabbitMQ so I had to deal with that. The ServiceNamespace is what the default namespace I set at installation time.

On a side note: I have a RabbitMQ broker configured on my system, with the AMQP 1.0 plugin enabled. According to the documentation of RabbitMQ, the NetMessagingBinding does not work with the RabbitMQ plugin. I have not verified this yet, but will do so in the near future. If it doesn’t work out, maybe I’ll invest time into writing an Amqp.net Lite WCF binding… Who knows :).

How to create a queue:

static NamespaceManager CreateNamespaceManager(ServiceBusConnectionStringBuilder connBuilder)
{
   return NamespaceManager.CreateFromConnectionString(connBuilder.ToString());
}
 
// Create the entity (queue)
static QueueDescription CreateQueue(NamespaceManager namespaceManager, bool requireSession, string queueName)
{
   QueueDescription queueDescription = new QueueDescription(queueName) { RequiresSession = requireSession };
 
   // Try deleting the queue before creation. Ignore exception if queue does not exist.
   try
   {
      if (!namespaceManager.QueueExists(queueName))
         queueDescription = namespaceManager.CreateQueue(queueDescription);
   }
   catch (MessagingEntityNotFoundException)
   { }
   catch (MessagingEntityAlreadyExistsException)
   { }
 
   return queueDescription;
}

ServiceBusConnectionStringBuilder connBuilder = new ServiceBusConnectionStringBuilder();
connBuilder.ManagementPort = HttpPort;
connBuilder.RuntimePort = tcpPort;
connBuilder.Endpoints.Add(new UriBuilder { Scheme = "sb", Host = ServerFQDN, Path = ServiceNamespace }.Uri);
connBuilder.StsEndpoints.Add(new UriBuilder { Scheme = "https", Host = ServerFQDN, Port = HttpPort, Path = ServiceNamespace }.Uri);
 
NamespaceManager namespaceClient = CreateNamespaceManager(connBuilder);
QueueDescription description = CreateQueue(namespaceClient, false, "ServiceBusQueueSample");

Ok, so now you’ve created a queue called ServiceBusQueueSample, good for you! 🙂

At this point, I think it interesting to link to the service bus explorer by Paolo Salvatori.

It’s easy to miss, but it’s easy to see the opening important notes by mr. Salvatori:

  • The source code for the Service Bus Explorer 3.0.4. This version of the tool uses the Microsoft.ServiceBus.dll 3.0.4 that is compatible with the current version of the Azure Service Bus, but not with the Service Bus 1.1, that is, the current version of the on-premises version of the Service Bus.
  • The Service Bus Explorer 2.1.3.0. This version can be used with the Service Bus 1.1. The Service Bus Explorer 2.1 uses the Microsoft.ServiceBus.dll client library which is compatible with the Service Bus for Windows Server 1.1 RTM version. You can download the source code of the Service Bus Explorer 2.1.3 from my OneDrive.

From <https://code.msdn.microsoft.com/windowsapps/Service-Bus-Explorer-f2abca5a/description>

So be aware that you cannot use the latest version of the service bus explorer when testing/developing against the SBWS. You will need version 2.1.3.0 if you develop against the SBWS.

Now you should have created a queue, and you should be able to use the Service Bus Explorer (SBE in short) to visualize that queue.

If you don’t get to this point, leave a message below. I will detail the steps to get here.

Publishing

So let’s start pushing data to that queue… First, you’ll need to define a service contract.

[ServiceContract]
   public interface ITemperatureService
   {
      [OperationContract(IsOneWay = true)]
      void UpdateTemperature(SensorData data);
   }

And SensorData is a simple DataContract

[DataContract]
   public class SensorData
   {
      [DataMember]
      public string Name { get; set; }
      [DataMember]
      public DateTime TimeStamp { get; set; }
      [DataMember]
      public int Value { get; set; }
   }

Now we are able to push some data to the queue (notice how we haven’t made the server yet?)…

string baseAddress = "sb://laptop:11354/dbengineering";
Console.WriteLine("Press enter to start");
Console.ReadKey();
NetMessagingBinding binding = new NetMessagingBinding();
binding.TransportSettings.BatchFlushInterval = new TimeSpan(0, 0, 2);
binding.OpenTimeout = new TimeSpan(0, 0, 5);
binding.PrefetchCount = -1;
 
EndpointAddress epa = new EndpointAddress(baseAddress + "/ServiceBusQueueSample");
 
var factory = new ChannelFactory<ITemperatureService>(binding, epa);
factory.Endpoint.EndpointBehaviors.Add(new TransportClientEndpointBehavior
{
   TokenProvider = TokenProvider.
   CreateWindowsTokenProvider(new List<Uri> { new UriBuilder { Scheme = "https", Host = ServerFQDN, Port = HttpPort, Path = ServiceNamespace  }.Uri }, new System.Net.NetworkCredential("username", "password"))
});
 
factory.Open();
var chan = factory.CreateChannel();
for (int i = 0; i < 10; i++)
{
   using (new OperationContextScope((IContextChannel)chan))
   {
      chan.UpdateTemperature(new SensorData { Name = "abcdef", TimeStamp = DateTime.Now, Value = i });
   }
}
factory.Close();

Result:

Think about it for a minute: your server does not need to be running to get to this state, your service bus has to run. This allows a very nice disconnected scenario.

Being the server (receiving and processing messages):

This should not be unfamiliar to you as this is the basic WCF buildup

  1. Create the connection builder,
  2. parameterize the connection to the service bus,
  3. host the service.
string ServerFQDN = System.Net.Dns.GetHostEntry(string.Empty).HostName;
int tcpPort = 11354;
int HttpPort = 11355;
string ServiceNamespace = "dbengineering";
 
ServiceBusConnectionStringBuilder connBuilder = new ServiceBusConnectionStringBuilder();
connBuilder.ManagementPort = HttpPort;
connBuilder.RuntimePort = tcpPort;
connBuilder.Endpoints.Add(new UriBuilder { Scheme = "sb", Host = ServerFQDN, Path = ServiceNamespace }.Uri);
connBuilder.StsEndpoints.Add(new UriBuilder { Scheme = "https", Host = ServerFQDN, Port = HttpPort, Path = ServiceNamespace }.Uri);
 
NamespaceManager namespaceClient = CreateNamespaceManager(connBuilder);
QueueDescription description = CreateQueue(namespaceClient, false, "ServiceBusQueueSample");
 
string baseAddress = "sb://laptop:11354/" + ServiceNamespace + "/";
NetMessagingBinding binding = new NetMessagingBinding();
 
// I will dive deep into the settings of the NetMessagingBinding in later 
// posts. For now, these values should work; feel free to play around with them!
binding.TransportSettings.BatchFlushInterval = new TimeSpan(0, 0, 2);
binding.OpenTimeout = new TimeSpan(0, 0, 5);
binding.PrefetchCount = -1;
binding.Namespace = ServiceNamespace;
 
var host = new ServiceHost<TemperatureService>();
var epa = new EndpointAddress(baseAddress + "ServiceBusQueueSample");
 
Uri serviceUri = new Uri("sb://laptop:11354/dbengineering/ServiceBusQueueSample");
 
host.AddServiceEndpoint(typeof(ITemperatureService), binding, epa.Uri, serviceUri);
host.Description.Endpoints.First().EndpointBehaviors.Add(new TransportClientEndpointBehavior { TokenProvider = TokenProvider.CreateWindowsTokenProvider(new List<Uri> { new UriBuilder { Scheme = "https", Host = ServerFQDN, Port  =HttpPort, Path = ServiceNamespace }.Uri }, new System.Net.NetworkCredential("username", "password")) });
host.Description.Behaviors.Add(new ErrorServiceBehavior());
         
host.Open();
Console.WriteLine("Opened");
Console.WriteLine(host.State);
Console.ReadLine();
host.Close();

I have added an ErrorServiceBehavior behavior to be able to debug error messages coming from the binding itself. This should help you troubleshoot issues if they arise. This is its code:

public class ErrorServiceBehavior : IServiceBehavior
{
   //From the MSDN documentation
   // Use the Validate method to examine the description before Windows Communication
   // Foundation (WCF) constructs the executing service to confirm that it can execute
   // properly.
   // Use the AddBindingParameters method to pass to a binding element the custom 
   // information for the service so that it can support the service correctly.
   // Use the ApplyDispatchBehavior method to change run-time property values or insert
   // custom extension objects such as error handlers, message or parameter interceptors, 
   // security extensions, and other custom extension objects.
 
   // The methods in here are listed in order of calling them (by WCF middleware)
   public void Validate(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
   {   }
 
   public void AddBindingParameters(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase, Collection<ServiceEndpoint> endpoints, BindingParameterCollection bindingParameters)
   {    }
 
   // The addressfiltermode cannot be applied here, as the Host is already initialized.
   public void ApplyDispatchBehavior(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
   {
      // Eror behaviors need to be set up before the host is opened but after 
      // it is initialized
      // Here it's hooking into the dispatcher's list (like an event system)
      foreach (ChannelDispatcher dispatcher in serviceHostBase.ChannelDispatchers)
      {
         dispatcher.IncludeExceptionDetailInFaults = true;
         bool value = dispatcher.ReceiveContextEnabled;
         dispatcher.ErrorHandlers.Add(new ErrorHandler());
      }
}

public class ErrorHandler : IErrorHandler
{
   public bool HandleError(Exception error)
   {
      if (!error.GetType().Equals(typeof(CommunicationException)))
      {
         // Handle the exception as required by the application
         Console.WriteLine(error.ToString());
      }
 
      return true;
   }
 
   public void ProvideFault(Exception error, MessageVersion version, ref Message fault)
   {
   }
}

Problems found:

When you’re working with the SBWS and WCF for the first time, you might run into these issues:

  • When the base address of the server is not the same as the one on the client, the server will not receive the messages.
    • There is an error message on the base address on the client side, but not on the server side.
  • Credentials need to be provided :).
  • The port to send to has to be the service bus’ TCP port, else you’ll get a socket closed exception. The NetMessagingBinding is using the AMQP client with the TCP port

So, we now have a very simple 1 method service contract. Let’s quickly take a look at what’s happening if you’re adding a second method to the same contract on the same queue.

Multiple methods on the contract

Initially, the contract looked like:

[ServiceContract]
   public interface ITemperatureService
   {
      [OperationContract(IsOneWay = true)]
      void UpdateTemperature(SensorData data);
   }

Expanding the impact of an extra method on a service:

[ServiceContract]
   public interface ITemperatureService
   {
      [OperationContract(IsOneWay = true)]
      void UpdateTemperature(SensorData data);
 
      [OperationContract(IsOneWay = true)]
      void UpdatePowerConsumption(SensorData data);
   }

Message on the wire for UpdatePowerConsumption

<?xml version="1.0" encoding="utf-8"?>
<UpdatePowerConsumption xmlns="http://tempuri.org/">
  <data xmlns:b="http://schemas.datacontract.org/2004/07/Wcf.Contracts" xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
    <b:Name>zxy</b:Name>
    <b:TimeStamp>2016-04-22T11:41:52.338523+02:00</b:TimeStamp>
    <b:Value>10</b:Value>
  </data>
</UpdatePowerConsumption>

Message on the wire for UpdateTemperature

<?xml version="1.0" encoding="utf-8"?>
<UpdateTemperature xmlns="http://tempuri.org/">
  <data xmlns:b="http://schemas.datacontract.org/2004/07/Wcf.Contracts" xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
    <b:Name>abcdef</b:Name>
    <b:TimeStamp>2016-04-22T11:41:52.3445584+02:00</b:TimeStamp>
    <b:Value>1</b:Value>
  </data>
</UpdateTemperature>

These messages will be delivered to their respective method and be dealth with there.

Notice how we haven’t changed the queue address. The conclusion is that the queue is now shared over multiple service methods (services).