What’s in This Chapter
An important component of any cloud architecture is the means by which the various components communicate. This chapter examines the September 2011 Release of the Service Bus, a cloud-hosted platform designed to enable on-premises to on-premises, cloud-hosted to cloud-hosted, and hybrid on-premises to cloud-hosted communication scenarios.
The Windows Azure Service Bus represents a rich collection of cloud-hosted services that enables both brokered and relayed communication scenarios. This section introduces the major features Service Bus provides.
Service Bus brokered messaging takes the basic concept of a hosted queue and extends it to provide support for:
These features enable message-driven communication between managed and REST clients, both on-premises and cloud-hosted, so long as the Service Bus is accessible from the clients by TCP or outbound HTTP. Clients can be written in .NET using the Service Bus managed library communicating via TCP/IP utilizing the Service Bus client protocol. Alternatively, clients can communicate across HTTP from any platform that supports the REST style of communication.
At its core, Service Bus brokered messaging relies on three primary components: queues, topics, and subscriptions.
Figure 13-1 shows a single Service Bus queue that has two senders adding messages to the queue. The same queue also has three receivers pulling messages from it. Observe, as implied in the diagram, that queues support REST and managed clients simultaneously. The same is also true for topics and subscriptions.
In the default case, each subscription receives every message sent to the topic, and in this case functions similarly to a queue, except messages are sent to a topic and received from a subscription.
Subscriptions offer richer functionality than simply multicasting to multiple receivers across multiple subscriptions, however, because they support filters, actions, and rules.
For example, for a given topic with two subscriptions and one receiver for each subscription, when a producer sends a message to the topic, the message is received separately by both receivers from their respective subscription—unless there is a rule in place that defines a filter. Figure 13-2 demonstrates this scenario, where receiver A gets only the one message from subscription A that matches the filter condition. Receiver B gets all the messages sent to the topic from subscription B because no filter is configured.
Each rule specifies a filter, an action, or both. A rule can be defined as just a filter, with no explicit action defined, to select which messages a subscription receives. A rule can also define a filter to determine when to apply an action that manipulates message properties and values (for example, messages that match the filter have their properties altered according to the action).
Rules enable you to create subscriptions that funnel messages to different consumers. For example, you could create a subscription that accepts only high-priority messages (such as paying customers) and are processed by a near real-time consumer, whereas all other messages (such as those from demo customers) get consumed by a scheduled process. You accomplish this by having a rule with a filter that examines whatever message property you deem contains the priority flag. Those that match go to the high-priority subscription and those that do not go to the normal-priority subscription.
By leveraging topics and subscriptions, you can plug in additional subscriptions to the topic at any time. For example, if at some point you need to add a consumer that logs details about messages for auditing purposes without affecting the regular processing of messages, you would simply create a new subscription for this, leaving the other subscriptions unaffected.
The Service Bus Relayed Messaging feature set enables bidirectional communication between message senders (clients) and listeners (services) via a cloud-hosted service referred to as a relay service. In this scenario, clients and services may be hosted on-premises, in Azure roles or in some combination of the two. Senders may be implemented with WCF or with any platform supporting REST communication, whereas listeners are traditionally implemented as WCF services. Listeners and senders can communicate over TCP and HTTP/S.
The relay is designed so communication between senders and listeners can easily traverse network boundaries and firewalls. Communication between sender and listener that ordinarily traverses the cloud-hosted relay can “upgrade” to direct communication between sender and listener, for example, when the sender and listeners are on the same network (see Figure 13-3). In addition, the relay provides features for event-distribution to large numbers of listeners, and a naming registry that quickly updates to list the registered service endpoints of listeners.
The key difference between Relayed and Brokered Messaging is that the former requires both client and server to be online at the same time to exchange messages.
If you explore the Service Bus APIs or documentation, you may find references to Service Bus Message Buffers. This functionality was designed to provide a transient, in-memory buffer for decoupled communication in a style similar to queues. Message Buffers have been superseded by the brokered messaging functionality and are maintained for backward compatibility only (in other words unlikely to receive future enhancements). For new solutions, you should use the Brokered Messaging features of queues, topics, and subscriptions.
Service Bus Brokered Messaging can be programmed using both a managed library and directly from REST clients. This section demonstrates how to leverage the Service Bus first from the managed world and later from a REST client.
Before you can write your first line of code against a queue, you need to create a Service Namespace for your Service Bus. You can think of a Service Namespace as an application boundary for all Service Bus resources your application might use and as the moniker under which usage charges accrue. The important results of creating a Service Namespace are the namespace and a shared secret key that you will need for access to Service Bus services.
To create a Service Namespace, follow these steps:
In addition to providing you with these important credentials, you can also use the Azure Management Portal to examine useful runtime information about existing queues, topics, and subscriptions. For example, the Portal provides the message count and storage sizes in MB of queues and the storage size in MB of topics. Currently, it does not provide the message counts for subscriptions. You can also use the Portal to create new queues, topics, and subscriptions with your service namespace.
You should build a managed client when your code to send or receive messages runs within a .NET process or if the code has access to the .NET framework. So what does it take to build a managed queue client? This section shows how to create a queue in your Service Namespace and send messages to it. You also see how to receive messages from it. For all the samples in this chapter you can start with the Console project that Visual Studio 2010 provides, or, more simply, just use LINQPad to run the code provided as shown in the text. Either way, you need to add a reference to the Microsoft.ServiceBus.dll assembly that is available with the Windows Azure AppFabric SDK version 1.5. This assembly contains all the classes you need to interact with Service Bus Brokered Messaging. You will also likely want to add using or import statements for the namespaces Microsoft.ServiceBus and Microsoft.ServiceBus.Messaging.
To build a build a Namespace, follow these steps:
string issuerName = "owner"; string issuerKey = "<INSERT YOUR ISSUER KEY>"; string serviceNamespace = "<INSERT YOUR SERVICE NAMESPACE>"; string queueName = "PrimaryQueue"; TokenProvider tokenProvider = TokenProvider. CreateSharedSecretTokenProvider( issuerName, issuerKey);
Code snippet Simple Queue Producer.linq
Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, string.Empty); NamespaceManager namespaceManager = new NamespaceManager(serviceUri, tokenProvider); "Checking if queue already exists...".Dump(); if (namespaceManager.QueueExists(queueName)) { "Deleting queue...".Dump(); namespaceManager.DeleteQueue(queueName); } "Creating queue...".Dump(); QueueDescription queueDescription = namespaceManager.CreateQueue(queueName);
code snippet Simple Queue Producer.linq
The constructor of a NamespaceManager requires the URI to your service namespace (in the form sb://<serviceNamespace>.servicebus.windows.net/), which you can easily construct using the CreateServiceUri static method of the ServiceBusEnvironment class. Although this is just a simple URI that you could create by hand, it is recommended that you use this method instead of building the string to protect your code in case the service URI format changes in the future.
This example constructs the namespaceManager and uses it to ensure that you start from a clean slate, deleting the queue if it happens to exist from a previous demo run so you do not get an error when re-creating an existing queue. You make the call to create the queue through namespaceManager.CreateQueue(), passing it the wanted name of the queue. The name of the queue can be a path of any depth (for example, top/left/center/myqueue), so long as it less than 290 characters. The call to CreateQueue returns a QueueDescription object that provides all the properties describing the queue, all of which are simply the default values at this point. One property of interest to you is the queueDescription.Path, which contains that queueName you provided.
Now you’re ready to start sending messages to the queue. You do this by acquiring a QueueClient from a MessagingFactory instance. You can use a QueueClient to send or receive messages from queue. Note that creating a QueueClient results in you also opening a persistent TCP connection to the queue.
//Create the queue client and use it to send messages to the queue MessagingFactory messagingFactory = MessagingFactory.Create(serviceUri, tokenProvider); QueueClient queueClient = messagingFactory.CreateQueueClient(queueDescription.Path); for (int i = 0; i < 5; i++) { BrokeredMessage msg = new BrokeredMessage(string.Format("Message #{0:00}", i)); msg.MessageId = i.ToString(); queueClient.Send(msg); string.Format("Sent message with ID: {0} and Body: {1}", msg.MessageId, msg.GetBody<string>()).Dump(); }
code snippet Simple Queue Producer.linq
Messages are represented by the BrokeredMessage class which allows you, among other things, to specify various message properties and content for the body (which must be serializable) of the message. In this example, you create a message with a body containing the string text Message #xx (passing that formatted string into the BrokeredMessage constructor) and a MessageId property with the value of the counter. To enqueue the message, you call the Send method of the QueueClient and pass it the BrokeredMessage instance.
When your loop completes, you’ll have sent all five messages and should wait for your consumers to run their code to retrieve messages before cleaning up the queue.
"Messages sent. Ensure consumer is running.".Dump(); Thread.Sleep(120000); //Cleanup sample queueClient.Close(); //Deletes queue and any messages within namespaceManager.DeleteQueue(queueName); "Deleted queue.".Dump(); "Finished.".Dump();
code snippet Simple Queue Producer.linq
The output of this should look as follows:
Checking if queue already exists... Deleting queue... Creating queue... Sent message with ID: 0 and Body: Message #00 Sent message with ID: 1 and Body: Message #01 Sent message with ID: 2 and Body: Message #02 Sent message with ID: 3 and Body: Message #03 Sent message with ID: 4 and Body: Message #04 Messages sent. Ensure consumer is running. Deleted queue. Finished.
Now that you have your queue created and a producer sending messages to it, you can build the code to read the messages from the Queue. This follows almost the same setup steps as before (get the credentials and create the TokenProvider, serviceUri), but this time you use the Receive() operation on the QueueClient. Your call to Receive indicates a long poll (where the server holds off responding immediately if there is no data ready yet), which waits for up to 10 seconds for a message. The reason you put the call to Receive within the condition of the while is to handle the case where the 10 seconds passes and no message data is returned. In this case, a null value is returned.
string issuerName = "owner"; string issuerKey = "<INSERT YOUR ISSUER KEY>"; string serviceNamespace = "<INSERT YOUR SERVICE NAMESPACE>"; string queueName = "PrimaryQueue"; TokenProvider tokenProvider = TokenProvider. CreateSharedSecretTokenProvider(issuerName, issuerKey); Uri serviceUri = ServiceBusEnvironment.CreateServiceUri( "sb", serviceNamespace, string.Empty); MessagingFactory messagingFactory = MessagingFactory.Create( serviceUri, tokenProvider); //Create a consumer in PeekLock receive mode QueueClient queueClient = messagingFactory.CreateQueueClient( queueName, ReceiveMode.PeekLock); BrokeredMessage msg; while ((msg = queueClient.Receive(TimeSpan.FromSeconds(10))) != null) { string.Format("Received message with ID: {0} and Body: {1}", msg.MessageId, msg.GetBody<string>()).Dump(); //must call msg.Complete() to delete peek-locked //message from queue msg.Complete(); } "Finished.".Dump();
code snippet Simple Queue Consumer.linq
Following are a few things worth observing in the previous code.
The output running the previous code (after the producer code finishes sending but has not yet deleted the queue) follows:
Received message with ID: 0 and Body: Message #00 Received message with ID: 1 and Body: Message #01 Received message with ID: 2 and Body: Message #02 Received message with ID: 3 and Body: Message #03 Received message with ID: 4 and Body: Message #04 Finished.
You can also retrieve messages from the queue using the ReceiveAndDelete mode instead of PeekLock. In this case, the message is retrieved and deleted from the queue in a single, atomic operation. The change alters the ReceiveMode specified in the call to CreateQueueClient and omits the call to msg.Complete().
QueueClient queueClient = messagingFactory.CreateQueueClient( queueName, ReceiveMode.ReceiveAndDelete); BrokeredMessage msg; while ((msg = queueClient.Receive(TimeSpan.FromSeconds(10))) != null) { string.Format("Received message with ID: {0} and Body: {1}", msg.MessageId, msg.GetBody<string>()).Dump(); //no need to call msg.Complete(), message already deleted }
code snippet Simple Queue Consumer.linq
Now that you understand how to program against queues, learning to program against their fuller featured cousins, topics and subscriptions, is an incremental step. For new projects, you can start with topics and subscriptions instead of queues because doing so leaves you well positioned to add additional “taps” into the topic in the form of a subscription without first needing to migrate from a queue-based implementation.
This section assumes the following constants for code snippets.
string issuerName = "owner"; string issuerKey = "<INSERT YOUR ISSUER KEY>"; string serviceNamespace = "<INSERT YOUR SERVICE NAMESPACE>"; string topicName = "PrimaryTopic"; string auditSubscriptionName = "AuditSubscription"; string regularSubscriptionName = "RegularSubscription";
code snippet Simple Topic Producer.linq
Now, you can dive into the details.
The pattern for creating a topic is nearly identical to that for creating a queue. With your authenticated NamespaceManager in hand, you simply call CreateTopic, passing in a path for the topic, where the path is subject to the same restrictions as a queue path (max length of 290 characters and must be URI-friendly).
TokenProvider tokenProvider = TokenProvider. CreateSharedSecretTokenProvider(issuerName, issuerKey); Uri serviceUri = ServiceBusEnvironment.CreateServiceUri( "sb", serviceNamespace, string.Empty); //Create topic NamespaceManager namespaceManager = new NamespaceManager( serviceUri, tokenProvider); if (namespaceManager.TopicExists(topicName)) { namespaceManager.DeleteTopic(topicName); } TopicDescription topicDescription = namespaceManager. CreateTopic(topicName);
code snippet Simple Topic Producer.linq
After you create a topic, creating a subscription is as easy calling CreateSubscription with the topic path and the name you want for the subscription. The name of a subscription is not a path; it must be a single segment and less than 50 characters.
SubscriptionDescription auditingSubscription = namespaceManager.CreateSubscription( topicDescription.Path, auditSubscriptionName);
code snippet Simple Topic Producer.linq
Sending messages to a topic is just like sending a message to a queue. You must first create a TopicClient instance from a MessagingFactory instance and then call the topic client’s Send method.
//Create the topic client and use it to send messages to the topic MessagingFactory messagingFactory = MessagingFactory.Create(serviceUri, tokenProvider); TopicClient topicClient = messagingFactory.CreateTopicClient(topicDescription.Path); for (int i = 0; i < 5; i++) { BrokeredMessage msg = new BrokeredMessage( string.Format("Message #{0:00}", i)); msg.MessageId = i.ToString(); topicClient.Send(msg); string.Format("Sent message with ID: {0} and Body: {1}", msg.MessageId, msg.GetBody<string>()).Dump(); }
code snippet Simple Topic Producer.linq
Messages can be received from a subscription using either a receive-and-delete or peek-lock/complete receive mode. The approach is to create a SubscriptionClient instance for the topic path and subscription name, specifying the receive mode in the call to CreateSubscriptionClient. With a SubscriptionClient instance in hand, you call Receive. This example calls Receive with a timeout of 10 seconds. Receive returns a null BrokeredMessage if the timeout expires. If you use the peek-lock receive mode, you must call Complete on the BrokeredMessage instance to remove it from the subscription. Otherwise, it reappears after the configured lock duration and the same message would be processed multiple times.
SubscriptionClient auditingSubscriptionClient = messagingFactory.CreateSubscriptionClient( topicName, auditSubscriptionName, ReceiveMode.ReceiveAndDelete); SubscriptionClient regularSubscriptionClient = messagingFactory.CreateSubscriptionClient( topicName, regularSubscriptionName, ReceiveMode.PeekLock); BrokeredMessage msg; while ( (msg = auditingSubscription.Receive(TimeSpan.FromSeconds(10))) != null ) { //…process msg … } while ( (msg = regularSubscription.Receive(TimeSpan.FromSeconds(10))) != null ) { //…process msg … msg.Complete(); }
code snippet Simple Topic Consumer.linq
Subscriptions support rules, which can define filters (that evaluate an expression typically containing message properties for a match) and actions (that can modify message properties). When a filter is matched for a subscription, the subscription contains the message; otherwise the message is ignored by the subscription.
Service Bus brokered messaging supports filters and actions that use a SQL-92–based syntax to define expressions. When programming with the managed library, you can create a filter that matches any message by providing an instance of the TrueFilter class, or no message with an instance of the FalseFilter class. Behind the scenes these create SqlFilters that contain the SQL expressions 1 = 1 and 1 = 0 respectively.
You can also create more advanced expressions using the SqlFilter class. For example, use messageSource LIKE ‘A%’ to match messages that have a message property called MessageSource whose value starts with A.
SubscriptionDescription auditingSubscription = namespaceManager.CreateSubscription( topicDescription.Path, auditSubscriptionName, new TrueFilter()); SubscriptionDescription regularSubscription = namespaceManager.CreateSubscription( topicDescription.Path, regularSubscriptionName, new SqlFilter("MessageSource LIKE 'A%'"));
code snippet Filtered Subscriptions Producer.linq
Observe that the previous snippet uses an overload of CreateSubscription that takes the path, subscription name and filter.
The Service Bus provides one more type of Filter, which enables efficient matching on the complete value of a message property—in the CorrelationId BrokeredMessage property. For example, you could treat all high-priority messages differently by using a CorrelationFilter, as shown in the following snippet. This subscription would receive all messages that have a value of “VIP” in the message.CorrelationId field.
string priorityCorrelationValue = "VIP"; SubscriptionDescription prioritySubscription = namespaceManager.CreateSubscription( topicDescription.Path, prioritySubscriptionName, new CorrelationFilter(priorityCorrelationValue));
code snippet Filtered Subscriptions Producer.linq
If you need to modify message properties as part of making a message available through a subscription, you need to define an action. To define an action you must create a rule and apply it to the subscription at subscription creation time. Within the rule you specify the filter that determines if the action is applied to the message, and also specify the SqlRuleAction that can add, modify, or remove message properties. The following snippet creates a rule that applies to messages having a messageSource (a custom property we added) that starts with A (such as Affiliate), and when true add a new message property called IsAffiliate with a value of true.
RuleDescription sourceStartsWithARule = new RuleDescription() { Name = "sourceStartsWithA", Filter = new SqlFilter("MessageSource LIKE 'A%'"), Action = new SqlRuleAction("SET IsAffiliate = TRUE") }; SubscriptionDescription regularSubscription = namespaceManager.CreateSubscription( topicDescription.Path, regularSubscriptionName, sourceStartsWithARule);
code snippet Filtered Subscriptions Producer.linq
To send custom message properties or correlation values with a message, you simply set the corresponding property on the BrokeredMessage instance. The following snippet shows how to add a custom property to the Properties collection, as well as setting a CorrelationId.
for (int i = 0; i < 5; i++) { //Create messages having differing properties //and correlation values BrokeredMessage msg = new BrokeredMessage( string.Format("Message #{0:00}", i)); msg.MessageId = i.ToString(); msg.Properties["MessageSource"] = ( i == 2 || i == 3) ? "Affiliate" : "Public"; msg.CorrelationId = (i == 4) ? "VIP" : null; topicClient.Send(msg); }
code snippet Filtered Subscriptions Producer.linq
So far you’ve seen the synchronous ways to interact with the Service Bus. The managed library for Brokered Messaging offers versions of those operations that follow the standard .NET Asynchronous Programming Model. Use the async approach to program against the Service Bus for high-throughput scenarios, or within in WPF or Windows Forms applications. The following snippet provides an example of how to receive messages asynchronously from a subscription and process them within a callback.
AsyncCallback cb = null; cb = (result) => { try { BrokeredMessage msg = regularSubscription.EndReceive(result); //msg may be null if no messages received before timeout if (msg != null) { //…do something with msg … regularSubscription.BeginReceive( TimeSpan.FromSeconds(10), cb, regularSubscription); } else { //no more messages to process //…stop polling or call BeginReceive again … } } catch (Exception ex) { //handle exception } }; //Invokes callback when either a message is received //OR wait timeout expires regularSubscription.BeginReceive( TimeSpan.FromSeconds(10), cb, regularSubscription);
code snippet Async Topic Consumers.linq
REST clients are able to perform the same core functionality available to managed clients for interacting with Service Bus Brokered Messaging, particularly for sending to queues or topics and receiving from queues or subscriptions (as well as applying any rules, filters, or actions defined) and managing the creation or deletion of such messaging entities.
In this section, you follow the same process for working with a filtered subscription as was shown previously with the managed client using the managed API, but instead you see how these steps can be accomplished with a simple REST client. Along the way you see highlighted notable differences between the two APIs.
Building a REST client forces you take on the complexity of authenticating with ACS to acquire the SWT token (basically a string passcode that is sent in the request headers) that you must present with all calls to the Service Bus. In addition, you are responsible for renewing the token when necessary (something the managed library otherwise handles for you) as shown in the following code:
static string GetServiceBusTokenFromACS(string serviceNamespace, string serviceIdentityUsername, string serviceIdentityPassword) { string acsHostName = "accesscontrol.windows.net"; WebClient client = new WebClient(); client.BaseAddress = string.Format( "https://{0}-sb.{1}", serviceNamespace, acsHostName); string realm = string.Format( "http://{0}.servicebus.windows.net/", serviceNamespace); NameValueCollection values = new NameValueCollection() { {"wrap_name", serviceIdentityUsername}, {"wrap_password", serviceIdentityPassword}, {"wrap_scope", realm} }; byte[] responseBytes = client.UploadValues("WRAPv0.9/", "POST", values); string response = Encoding.UTF8.GetString(responseBytes); string token = "WRAP access_token="" + Uri.UnescapeDataString(response .Split('&') .Single(value => value.StartsWith("wrap_access_token=", StringComparison.OrdinalIgnoreCase)) .Split('=')[1]) + """; return token; }
code snippet REST APIs.linq
Before you can communicate across the Service Bus by sending or receiving messages, you need to create a messaging entity such as a queue or topic. This shows how to create a queue, and then next, a topic. You can create a queue when you know you do not need the additional features offered by subscriptions—namely filters and actions. With the code to acquire a token from ACS in hand, creating a queue is straightforward and demonstrates a pattern all the REST management APIs follow. First, you craft the URI to the Service Namespace:
string uriToNamespace = string.Format( "https://{0}.servicebus.windows.net/", serviceNamespace);
code snippet REST APIs.linq
You then use this URI, along with the queue path you want for the new queue, and the token previously acquired from ACS as inputs to the queue creation method.
static string CreateQueue(string uriToNamespace, string queuePath, string token) { string queueUri = uriToNamespace + queuePath; WebClient wc = new WebClient(); wc.Headers[HttpRequestHeader.Authorization] = token; string queueDescription = @"<entry xmlns=""http://www.w3.org/2005/Atom""> <title type=""text"">" + queuePath + @"</title> <content type=""application/xml""> <QueueDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/netservices/ 2010/10/servicebus/connect"" /> </content> </entry>"; byte[] response = wc.UploadData( queueUri, "PUT", Encoding.UTF8.GetBytes(queueDescription)); return Encoding.UTF8.GetString(response); }
code snippet REST APIs.linq
In the previous code, notice the pattern:
static string GetQueues(string uriToNamespace, string token) { string queuesUri = uriToNamespace + "/$Resources/Queues"; WebClient wc = new WebClient(); wc.Headers[HttpRequestHeader.Authorization] = token; return wc.DownloadString(rulesUri); }
code snippet REST APIs.linq
Table 13-1 presents a summary of the parameters for all REST operations. Use this table in lieu of repeating the same basic code for the other operations shown in this section.
Parameter | Description |
Entity URI | uriNamespace + queuePath |
Header | Authorization: token |
Payload | QueueDescription |
Verb | PUT |
Response | QueueDescription |
An example of Queue Description payload follows:
"<entry xmlns=""http://www.w3.org/2005/Atom""> <title type=""text"">" + queuePath + "</title> <content type=""application/xml""> <QueueDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/netservices/ 2010/10/servicebus/connect"" /> </content> </entry>"
The response to a successful creation operation, just as in the managed case, is QueueDescription. The response instance provides value for all properties of a QueueDescription. For values not supplied by the client, the server-side default values are applied to the queue and included in the response.
You generally want to create a topic when you send messages across the Service Bus because it gives you the flexibility to plug in additional subscriptions (with differing rules) at any point later. If you start with a queue and find you need a filter or want to modify message properties with an action, you must delete the queue and replace it with a topic and one or more subscriptions. This also means you need to adjust any code written that sends a message to a topic and that which receives messages to receive from a subscription. The approach for creating a topic is almost identical to creating a queue, except that you need to submit a TopicDescription instead of a QueueDescription, as shown in Table 13-2.
Parameter | Description |
Entity URI | uriNamespace + topicPath |
Header | Authorization: token |
Payload | Topic Description |
Verb | PUT |
Response | Topic Description |
A TopicDescription has the following format:
"<entry xmlns=""http://www.w3.org/2005/Atom""> <title type=""text"">" + topicPath + "</title> <content type=""application/xml""> <TopicDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/ netservices/2010/10/servicebus/connect"" /> </content> </entry>"
You create a subscription to receive messages sent to a topic and to define filters and actions. Creating a subscription is also similar to creating a queue or topic, as shown in Table 13-3. Rules for a subscription are created separately, as you see in the next section.
Parameter | Description |
Entity URI | uriNamespace + topicPath + "/Subscriptions/” + subscriptionName |
Header | Authorization: token |
Payload | SubscriptionDescription |
Verb | PUT |
Response | SubscriptionDescription |
The payload for creating a subscription is a SubscriptionDescription:
"<entry xmlns=""http://www.w3.org/2005/Atom""> <title type=""text"">" + subscriptionName + "</title> <content type=""application/xml""> <SubscriptionDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/ netservices/2010/10/servicebus/connect"" /> </content> </entry>"
With a Subscription in place, you define Rules to filter out messages from appearing in the subscription, alter properties of the message, or both. Unlike the experience with the managed client library, when using REST, rules are created in a separate call from the one creating the subscription. In the following code , the ruleText for a trivial rule could be 1 = 1, which is equivalent to a TrueFilter, or you could provide a more complex rule such as MessageSource LIKE 'A%' to evaluate message properties, as shown in Table 13-4.
Parameter | Description |
Entity URI: | uriNamespace + topicPath + "/Subscriptions/” + subscriptionName + "/Rules/” + ruleName |
Header: | Authorization: token |
Payload: | RuleDescription |
Verb: | PUT |
Response: | RuleDescription |
The RuleDescription payload takes the following form:
"<entry xmlns=""http://www.w3.org/2005/Atom""> <content type=""application/xml""> <RuleDescription xmlns:i=""http://www.w3.org/2001/XMLSchema-instance"" xmlns=""http://schemas.microsoft.com/ netservices/2010/10/servicebus/connect"" /> <Filter i:type=""SqlFilter""> <SqlExpression>" + ruleText + "</SqlExpression> <CompatibilityLevel>20</CompatibilityLevel> </Filter> <Action i:type=""EmptyRuleAction"" /> </RuleDescription> </content> </entry>";
You send messages to a queue or topic for the eventual consumption by a receiver reading from the queue or from the subscription created on the topic. Sending messages to queues or topics is simply a POST to the messages segment of the entity path combined with a timeout (specified in seconds) in the query string, as shown in Table 13-5. Message Properties, as used by the BrokeredMessage type, appear within the Header. The body can take any form so long as it serializes to a UTF 8 encoded byte array.
Parameter | Description |
Entity URI | Queue uriNamespace + queuePath + "/messages?timeout=60" Topic uriNamespace + topicPath + "/messages?timeout=60" |
Header | Authorization: tokenMessage Properties |
Payload | Message body |
Verb | POST |
Response | N/A |
wc.Headers[headerName] = """ + headerValue + """;The payload of a message sent to a queue can be as simple as "Hello world."
Rules enable you to define actions (which can alter message properties) and filters (which control the availability of messages) on a subscription. Messages can be received from queues or subscriptions using either receive-and-delete or peek-lock/complete approaches. The calling pattern, as shown in Table 13-6, is the same in both cases.
Parameter | Description |
Entity URI | Queue uriNamespace + queuePath + "/messages/head?timeout=60" Subscription uriNamespace + topicPath + "/Subscriptions/ ” + subscriptionName + "/messages/head?timeout=60" |
Header | Authorization: token Message Properties |
Payload | Empty byte array |
Verb | Receive and Delete - DELETE Peek Lock - DELETE |
Response | Headers: BrokeredProperties Message Properties Body: Message body |
An example value of BrokeredProperties follows (observe the JSON format used):
{"DeliveryCount":1, "LockToken":"6dc54ba3-6c5b-4ae4-b25f-b457586282b3", "LockedUntilUtc":"Thu, 20 Oct 2011 17:40:29 GMT", "MessageId":"18d2c9e585d840f690ab276bf71ed98c", "SequenceNumber":1, "TimeToLive":922337203685.47754}
When performing a peek-lock receive, you must capture the values of the lock token (aka lock Id) and message ID returned in the BrokeredProperties header found within the response headers to complete the receive operation later. The value of this header is a JSON formatted dictionary. Refer back to Table 13-6 for a sample. Table 13-7 shows how to complete a peek-locked message and actually delete it from the queue or subscription.
Parameter | Description |
Entity URI | Queue uriNamespace + queuePath + "/messages/" + messageId + "/" + lockId Subscription uriNamespace + topicPath + "/Subscriptions/” + subscriptionName + "/messages/" + messageId + "/" + lockId |
Header | Authorization: token |
Payload | Empty byte array |
Verb | DELETE |
Response | N/A |
You are billed for the existence of messaging entities such as queues, topics, and subscriptions. It is common sense to clean up resources you do not use or no longer apply (such as a rule). The approach used to delete any entity is to perform a DELETE operation against the entity URI, providing the token in the authorization header and an empty byte array as the payload, as Table 13-8 summarizes.
Parameter | Description |
Entity URI | Queue uriNamespace + queuePath TopicuriNamespace + topicPath Subscription uriNamespace + topicPath + "/Subscriptions/” + subscriptionName Rule uriNamespace + topicPath + "/Subscriptions/” + subscriptionName + "/Rules/” + ruleName |
Header | Authorization: token |
Payload | Empty byte array |
Verb | DELETE |
Response | N/A |
You can also request information about existing queues, topics, subscriptions (see Table 13-9) and rules using REST (see Table 13-10). You would do this to see what setting you used previously during the entity creation or to see what default values have been applied.
Parameter | Description |
Entity URI | Queues uriToNamespace + "/$Resources/Queues" Topic uriToNamespace + "/$Resources/Topics" Subscription uriNamespace + topicPath + "/Subscriptions/" |
Header | Authorization: token |
Payload | Empty byte array |
Verb | GET |
Response | An ATOM feed containing a collection of the requested descriptions. |
When requesting subscription descriptions, you must specify the topic path, which returns the subscriptions to that topic.
Parameter | Description |
Entity URI | uriNamespace + topicPath + "/Subscriptions/” + subscriptionName + "/Rules/” + ruleName |
Header | Authorization: token |
Payload | Empty byte array |
Verb | GET |
Response | ATOM feed containing collection of RuleDescription elements (which contain Filter and Action subelements) for each rule applied to the subscription |
Service Bus brokered messaging offers many more features, which are not shown in the previous code samples. This section mentions them and refers you to the resources at the end of the chapter as well as the source code included with the book for details. Each feature serves to show that there is much more to messaging with Service Bus Brokered Messaging than sending and receiving messages, and that it helps you solve tough problems such as building a request/response messaging pattern across queues or automatically dealing with poisonous messages that repeatedly crash the receiver, or even optimizing communication by batching requests.
Queues and subscriptions provide support for grouping messages into a session. Effectively this enables you to receive multiple messages related by a Session ID (specified at send time) in order, regardless of other messages that might have also been sent to the queue or topic in-between. The MessageSession class provides this functionality and is created from either a QueueClient or SubscriptionClient via the AcceptMessageSession method. This method can take a parameter indicating a wanted Session ID to process, or no parameter at all to process messages from the next session. These are referred to as named and nameless sessions respectively.
Finally, sessions have a durable property for maintaining Session State that that you can use independent of the messages within the session, or to collect statistics about the session.
The Service Bus provides support for Complete (of a peek-locked message) and for Send as part of local transactions such as those created within a TransactionScope. This enables you, for example, to delete a peek-locked message from a queue and send a message to another queue as a single atomic operation. If either step fails, the peek-locked message is not completed, and the other message is never sent. Neither receive in peek-lock nor receive-and-delete participate within a local transaction, which means, for example, that you cannot rollback a receive operation (as if the message were never acquired) if a subsequent send operation in the same transaction fails.
Dead lettering is the process to remove a message from the active queue or subscription into a special subqueue, called a dead letter queue, which is designed to capture messages that cannot be processed. The Service Bus enables you to automatically dead letter a message if its Time-To-Live (TTL) expires, if it’s received more than a certain number of times, or if the message encounters an exception when a filter expression is being evaluated for it. You can also manually dead letter messages via the BrokeredMessage instance’s DeadLetter method.
You can opt to defer the processing of a received message and control when that happens in your application by calling Defer on a BrokeredMessage instance. A deferred message is not available to other clients of a queue or subscription and must be retrieved by using its message sequence number (which needs to be captured and stored by the application upon initial retrieval, for later use).
In scenarios in which processing duplicate messages is unwanted, you can set the MessageId of sent messages to some unique value and rely on the Service Bus to ensure that no message with that MessageId reappears within a configurable window of time. (The default is 10 minutes.)
By default, the Service Bus enables client-side batching for asynchronous Send and Complete requests made with the managed client library. With this enabled, the client holds back on sending messages for small amounts of time, up to a maximum batch size, in order to send multiple messages or multiple Complete requests in a single request.
When receiving a message, you can use prefetching to return not only the next message, but also multiple messages in a single request (which will contain multiple messages). These prefetched messages are returned to Receive calls from a local cache and have a lock on them that expires (such that they again become available in the queue) after an interval of 60 seconds if they have not been received by the client who cached them.
In many cases, choosing between using the Service Bus managed client and the REST API is decided by the platform on which you build your clients (for example, you are forced to build REST if the clients are not built with .NET such as mobile phone clients). However, if both are still options there are other factors to consider; in this section you receive practical guidance to aid your selection.
The performance is lower for REST clients when compared to managed clients because the Service Bus Client Protocol maintains an open TCP connection to the Service Bus (while the MessagingFactory is open), whereas each HTTP request must open a new connection. This is an expensive operation that cannot be avoided for REST clients.
In addition, the Service Bus client protocol provides client-side batching on asynchronous send or complete and message prefetching (both introduced previously). These can significantly improve throughput by minimizing the number of round trips that need to be made for managed clients but is not available to REST clients.
Finally, if you need to support sessions, for reasons such as chunking large messages, you must use the managed client because sessionful receivers are not currently supported by the REST API.
One feature that may drive you to use the REST API is a high number of senders and receivers. Service Bus queues, topics, or subscriptions can support a maximum of 100 concurrent clients each. Therefore, if you have the potential for more connections, the REST APIs that open a connection make the request and close immediately, representing the best approach to maximize your use of concurrent connections.
Connectivity may also cause you to use REST. For most firewall-protected networks, outbound HTTP is usually allowed, but TCP may be blocked on port 9354—the port used by the Service Bus Client Protocol. In this case, you either must open that port or switch to using a REST client.
The following section discusses best practices that achieve better performance and reliability with Service Bus Brokered Messaging.
One key to understanding how to achieve the best performance with Service Bus is to understand how messages are processed and where they are stored internally. A single queue stores its data within a single database, and all messaging operations are handled by that single database instance. A topic and all its subscriptions are stored within the single database, and all messaging operations across the topic and the subscriptions are handled by the same node. Each node can handle several thousand messages per second, but an upper bound to the throughput exists, which implies the following:
On the client side, when using the Service Bus Client Protocol, recall that each messaging factory shares a single open connection irrespective of the number of queue, topic, or subscription clients created from it. To increase throughput, you should do the following:
Fundamentally, Windows Azure Storage queues and Service Bus Brokered Messaging provide the same core queuing functionality. If your solution requires only the basics, either may suffice. Scenarios beyond the basics, however, can likely benefit from the richer feature-set provided by the Service Bus. The following are the key differences:
Now that you have an understanding of using Service Bus Brokered Messaging, the following section looks at the Service Bus quotas and how your usage is measured and ultimately billed.
As of this writing, Service Bus Brokered Messaging is free of charge in production but introduces two meters you can use today to measure your consumption using the Microsoft Online Customer Portal. When pricing comes into effect, it will revolve around these meters.
The two meters are Relay Hours and Messages:
In addition, multipliers exist on these meters based on message size or storage used. Although Service Bus supports message sizes up to 256KB, costs for messages above 64KB in size and the number of message operations are multiplied for each additional 64KB sent or received.
Table 13-11 lists the Quotas enforced for features in Service Bus Brokered Messaging.
Item | Quota Limit |
Max messages exchanged | 5,000,000,000 |
Queue or Topic Size | 5GB |
Concurrent TCP (non-REST) Connections to Queue, Topic, or Subscription | 100 |
Number of Topics or Queues per Namespace | 10,000 |
Message Size | 256KB |
Message Header Size | 64KB |
Number of Subscriptions per Topic | 2,000 |
Number of SQL Filters per Topic | 2,000 |
Number of Correlation Filters per Topic | 100,000 |
In this chapter you had an overview of the major features of the Service Bus: Relay and Brokered Messaging. You then dove into Brokered Messaging and its constituent components, queues, topics, subscriptions, and rules, learning how to leverage them from managed clients using the Service Bus Client Protocol and the managed SDK, as well as by interacting with the REST API.
Along the way, you considered some of the additional features such as Sessions and Batching that make the managed scenario quite robust. You also received some guidance on how to choose between the managed and REST APIs for scenarios in which either is an option, as well as how the Service Bus feature set compares to that of the similar Windows Azure Queues.
The coverage concluded with the best practices you should follow to optimize your Service Bus performance and reliability, and understanding what quotas and billing metrics are in place with this release.
Take the knowledge you gained and start connecting your components—happy messaging!