Menu

.NET/C# Client API Guide

This page gives an overview of the RabbitMQ .NET/C# client API.

The code samples given here demonstrate connecting to RabbitMQ and performing several common operations with the client.

The library is open-source, and is dual-licensed under the Apache License v2 and the Mozilla Public License v1.1.

The client is dual-licensed under

The client API is closely modelled on the AMQP 0-9-1 protocol specification, with additional abstractions for ease of use.

This section gives an overview of the RabbitMQ .NET client API. Only the basics of using the library are covered: for full detail, please see the javadoc-like API documentation generated from the source code.

Major namespaces, interfaces and classes

The core API interfaces and classes are defined in the RabbitMQ.Client namespace:

using RabbitMQ.Client;
The core API interfaces and classes are
  • IModel: represents an AMQP 0-9-1 channel, and provides most of the operations (protocol methods).
  • IConnection: represents an AMQP 0-9-1 connection
  • ConnectionFactory: constructs IConnection instances
  • IBasicConsumer: represents a message consumer
Other useful interfaces and classes include:
  • DefaultBasicConsumer: commonly used base class for consumers
Public namespaces other than RabbitMQ.Client include:
  • RabbitMQ.Client.Events: various events and event handlers that are part of the client library, including EventingBasicConsumer, a consumer implementation built around C# event handlers.
  • RabbitMQ.Client.Exceptions: exceptions visible to the user.

All other namespaces are reserved for private implementation detail of the library, although members of private namespaces are usually made available to applications using the library in order to permit developers to implement workarounds for faults or design mistakes they discover in the library implementation. Applications cannot rely on any classes, interfaces, member variables etc. that appear within private namespaces remaining stable across releases of the library.

Connecting to a Broker

To connect to a RabbitMQ, it is necessary to instantiate a ConnectionFactory and configure it to use desired hostname, virtual host, and credentials. Then use ConnectionFactory.CreateConnection() to open a connection. The following two code snippets connect to a RabbitMQ node on hostName:

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;

IConnection conn = factory.CreateConnection();
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = "amqp://user:pass@hostName:port/vhost";

IConnection conn = factory.CreateConnection();

Since the .NET client uses a stricter interpretation of the AMQP 0-9-1 URI spec than the other clients, care must be taken when using URIs. In particular, the host part must not be omitted and virtual hosts with empty names are not addressable. All factory properties have default values. The default value for a property will be used if the property remains unassigned prior to creating a connection:

Username
"guest"
Password
"guest"
Virtual host
"/"
Hostname
"localhost"
port
5672 for regular connections, 5671 for connections that use TLS

The IConnection interface can then be used to open a channel:

IModel channel = conn.CreateModel();
The channel can now be used to send and receive messages, as described in subsequent sections.

Using Exchanges and Queues

Client applications work with exchanges and queues, the high-level building blocks of AMQP 0-9-1. These must be "declared" before they can be used. Declaring either type of object simply ensures that one of that name exists, creating it if necessary. Continuing the previous example, the following code declares an exchange and a queue, then binds them together.

model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
model.QueueDeclare(queueName, false, false, false, null);
model.QueueBind(queueName, exchangeName, routingKey, null);
This will actively declare the following objects:
  • a non-durable, non-autodelete exchange of "direct" type
  • a non-durable, non-autodelete, non-exclusive queue
The exchange can be customised by using additional parameters. The above code then binds the queue to the exchange with the given routing key. Note that many channel API (IModel) methods are overloaded. The convenient short form of ExchangeDeclare uses sensible defaults. There are also longer forms with more parameters, to let you override these defaults as necessary, giving full control where needed. This "short version, long version" pattern is used throughout the API.

Publishing Messages

To publish a message to an exchange, use IModel.BasicPublish as follows:

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
For fine control, you can use overloaded variants to specify the mandatory flag, or specify messages properties:
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
model.BasicPublish(exchangeName,
                   routingKey, props,
                   messageBodyBytes);
This sends a message with delivery mode 2 (persistent) and content-type "text/plain". See the definition of the IBasicProperties interface for more information about the available message properties.

In the following example, we publish a message with custom headers:

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Headers = new Dictionary<string, object>();
props.Headers.Add("latitude",  51.5252949);
props.Headers.Add("longitude", -0.0905493);

model.BasicPublish(exchangeName,
                   routingKey, props,
                   messageBodyBytes);

Code sample below sets a message expiration:

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "36000000"

mode.BasicPublish(exchangeName,
                  routingKey, props,
                  messageBodyBytes);

Fetching Individual Messages ("pull API")

To retrieve individual messages, use IModel.BasicGet. The returned value is an instance of BasicGetResult, from which the header information (properties) and message body can be extracted:

bool noAck = false;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null) {
    // No message available at this time.
} else {
    IBasicProperties props = result.BasicProperties;
    byte[] body = result.Body;
    ...
Since noAck = false above, you must also call IModel.BasicAck to acknowledge that you have successfully received and processed the message:
    ...
    // acknowledge receipt of the message
    channel.BasicAck(result.DeliveryTag, false);
}
Note that fetching messages using this API is relatively inefficient. If you'd prefer RabbitMQ to push messages to the client, see the next section.

Retrieving Messages By Subscription ("push API")

Another way to receive messages is to set up a subscription using the IBasicConsumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively. One way to implement a consumer is to use the convenience class EventingBasicConsumer, which dispatches deliveries and other consumer lifecycle events as C# events:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body;
                    // ... process the message
                    channel.BasicAck(ea.DeliveryTag, false);
                };
String consumerTag = channel.BasicConsume(queueName, false, consumer);
Another option is to subclass DefaultBasicConsumer, overriding methods as necessary, or implement IBasicConsumer directly. You will generally want to implement the core method IBasicConsumer.HandleBasicDeliver. More sophisticated consumers will need to implement further methods. In particular, HandleModelShutdown traps channel/connection closure. Consumers can also implement HandleBasicCancelOk to be notified of cancellations. The ConsumerTag property of DefaultBasicConsumer can be used to retrieve the server-generated consumer tag, in cases where none was supplied to the original IModel.BasicConsume call. You can cancel an active consumer with IModel.BasicCancel:
channel.BasicCancel(consumerTag);
When calling the API methods, you always refer to consumers by their consumer tags, which can be either client- or server-generated as explained in the AMQP 0-9-1 specification document.

Concurrency Considerations for Consumers

Each IConnection instance is, in the current implementation, backed by a single background thread that reads from the socket and dispatches the resulting events to the application. If heartbeats are enabled, as of version 3.5.0 they are implemented in terms of .NET timers. Usually, therefore, there will be at least two threads active in an application using this library:

the application thread
contains the application logic, and makes calls on IModel methods to perform protocol operations.
the I/O activity thread
hidden away and completely managed by the IConnection instance.

The one place where the nature of the threading model is visible to the application is in any callback the application registers with the library. Such callbacks include:

  • any IBasicConsumer method
  • the BasicReturn event on IModel
  • any of the various shutdown events on IConnection, IModel etc.

Consumer Callbacks and Ordering

As of version 3.5.0 application callback handlers can invoke blocking operations (such as IModel.QueueDeclare or IModel.BasicCancel). IBasicConsumer callbacks are invoked concurrently. However, per-channel operation order is preserved. In other word, if messages A and B were delivered in this order on the same channel, they will be processed in this order. If messages A and B were delivered on different channels, they can be processed in any order (or in parallel). Consumer callbacks are invoked in tasks dispatched to the default TaskScheduler provided by the .NET runtime.

Using a Custom Task Scheduler

It is possible to use a custom task scheduler by setting ConnectionFactory.TaskScheduler:

public class CustomTaskScheduler : TaskScheduler
{
  // ...
}

var cf = new ConnectionFactory();
cf.TaskScheduler = new CustomTaskScheduler();
This, for example, can be used to limit concurrency degree with a custom TaskScheduler.

Sharing Channels Between Threads

As a rule of thumb, IModel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for IModel instances. If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion itself. One way of achieving this is for all users of an IModel to lock the instance itself:

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
}
Symptoms of incorrect serialisation of IModel operations include, but are not limited to,
  • invalid frame sequences being sent on the wire (which occurs, for example, if more than one BasicPublish operation is run simultaneously), and/or
  • NotSupportedExceptions being thrown from a method in class RpcContinuationQueue complaining about "Pipelining of requests forbidden" (which occurs in situations where more than one AMQP 0-9-1 synchronous operation, such as ExchangeDeclare, is run simultaneously).

Handling Unroutable Messages

If a message is published with the "mandatory" flag set, but cannot be delivered, the broker will return it to the sending client (via a basic.return AMQP 0-9-1 command). To be notified of such returns, clients can subscribe to the IModel.BasicReturn event. If there are no listeners attached to the event, then returned messages will be silently dropped.

model.BasicReturn +=
  new RabbitMQ.Client.Events.BasicReturnEventHandler(...);
The BasicReturn event will fire, for example, if the client publishes a message with the "mandatory" flag set to an exchange of "direct" type which is not bound to a queue.

Disconnecting from RabbitMQ

To disconnect, simply close the channel and the connection:

channel.Close(200, "Goodbye");
conn.Close();
Note that closing the channel is considered good practice, but isn't strictly necessary - it will be done automatically anyway when the underlying connection is closed. In some situations, you may want the connection to close automatically once the last open channel on the connection closes. To achieve this, set the IConnection.AutoClose property to true, but only after creating the first channel:
IConnection conn = factory.CreateConnection(...);
IModel channel = conn.CreateModel();
conn.AutoClose = true;
When AutoClose is true, the last channel to close will also cause the connection to close. If it is set to true before any channel is created, the connection will close then and there.

Automatic Recovery From Network Failures

Connection Recovery

Network connection between clients and RabbitMQ nodes can fail. RabbitMQ .NET/C# client supports automatic recovery of connections and topology (queues, exchanges, bindings, and consumers). The automatic recovery process for many applications follows the following steps:

  1. Reconnect
  2. Restore connection listeners
  3. Re-open channels
  4. Restore channel listeners
  5. Restore channel basic.qos setting, publisher confirms and transaction settings
Topology recovery includes the following actions, performed for every channel
  1. Re-declare exchanges (except for predefined ones)
  2. Re-declare queues
  3. Recover all bindings
  4. Recover all consumers
To enable automatic connection recovery, set ConnectionFactory.AutomaticRecoveryEnabled to true:
ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
// connection that will recover automatically
IConnection conn = factory.CreateConnection();
If recovery fails due to an exception (e.g. RabbitMQ node is still not reachable), it will be retried after a fixed time interval (default is 5 seconds). The interval can be configured:
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

Topology Recovery

Topology recovery involves recovery of exchanges, queues, bindings and consumers. It is enabled by default but can be disabled:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.CreateConnection();
factory.AutomaticRecoveryEnabled = true;
factory.TopologyRecoveryEnabled  = false;

Manual Acknowledgements and Automatic Recovery

When manual acknowledgements are used, it is possible that network connection to RabbitMQ node fails between message delivery and acknowledgement. After connection recovery, RabbitMQ will reset delivery tags on all channels. This means that basic.ack, basic.nack, and basic.reject with old delivery tags will cause a channel exception. To avoid this, RabbitMQ .NET client keeps track of and updates delivery tags to make them monotonically growing between recoveries. IModel.BasicAck, IModel.BasicNack, and IModel.BasicReject then translate adjusted delivery tags into those used by RabbitMQ. Acknowledgements with stale delivery tags will not be sent. Applications that use manual acknowledgements and automatic recovery must be capable of handling redeliveries.

Common ways of working with AMQP 0-9-1

When building distributed systems with RabbitMQ, there are a number of different messaging patterns that crop up over and over again. In this section, we cover some of the most common coding patterns and interaction styles:

  • Point-to-point messaging: both remote procedure call (RPC), and asynchronous messages directed toward a particular receiver.
  • Event broadcasting: one-to-many interactions; transmission of messages directed implicitly to a set of interested receivers, with collection of zero or more possible responses.
  • Responsibility transfer: choosing which piece of the network is responsible for any given message.
  • Message transfer: at-least-once and at-most-once message delivery.
  • Preserving atomicity and idempotence when interacting with external resources.
Limited library support is also available for working with these patterns, in the RabbitMQ.Client.MessagePatterns namespace:
  • Subscription provides a high-level interface to receiving messages from the server.
  • SimpleRpcServer builds on Subscription to implement an RPC or one-way service.
  • SimpleRpcClient builds on Subscription to interact with remote services.
Future releases of the RabbitMQ .NET client library will include improved high-level support for the most common messaging patterns and their variations.

Point-to-point Messaging

The point-to-point messaging pattern occurs when the publisher of a message has a particular receiving application in mind - for instance, when a RPC-style service is made available via the AMQP server, or when an application in a workflow chain receives a work item from its predecessor and sends the transformed work item to its successor.

Synchronous, Client-Server Remote Procedure Call (RPC)

In order to perform request/response RPC,
  • some means of addressing the service must be available
  • some means of receiving a reply must be available
  • some means of correlating the request message to the reply message must be available

Addressing the service

Since AMQP messages are published using a pair of an exchange name and a routing key, this is sufficient for addressing a service. Using a simple exchange-name/routing-key combination permits a number of different ways to implement the service while presenting the same interface to clients. For instance, the service could be implemented as a single process consuming from a queue, and load-balancing internally, or it could be multiple processes consuming from a single queue, being handed requests round-robin style, thereby load balancing without special coding in the service logic. Messages can be addressed to the service request queue either
  • directly, using the AMQP default exchange (""); or
  • indirectly, by using a service-specific exchange, which leaves the routing-key free for such purposes as method selection or additional service-specific addressing information; or
  • indirectly, by using an exchange shared by multiple services, with the service name encoded in the routing key.
Using an exchange other than the default exchange permits other applications to receive copies of each request message, which can be useful for monitoring, auditing, logging and debugging.

Ensuring a service instance is listening

AMQP 0-9-1 publish operation (IModel.BasicPublish) provides a delivery flag, "mandatory", which can be used to ensure service availability at the time a request is sent by a client. Setting the "mandatory" flag causes a request to be returned if it cannot be routed to a queue. Returned messages appear as basic.return commands, which are made visible to the application via the IModel.BasicReturn event on the IModel that was used to publish the message.

Since published messages are returned to clients via basic.return method, and basic.return is an asynchronous negative-acknowledgement event, the absence of a basic.return for a particular message cannot be taken as a confirmation of delivery: the use of delivery flags only provides a way of raising the bar, rather than eliminating failure entirely.

Additionally, the fact that a message was flagged "mandatory", and successfully enqueued on one or more queues, is no guarantee of its eventual receipt: most trivially, the queue could be deleted before the message is processed, but other situations, like the use of the noAck flag by a message consumer, can also make the guarantee provided by "mandatory" conditional.

Alternatively, one could use Publisher Confirms. Setting a channel into confirm mode by calling IModel.ConfirmSelect causes the broker to send a Basic.Ack after each message is processed by delivering to a ready consumer or by persisting to disk. Once a successfully processed message has been confirmed via the IModel.BasicAcks event handler, the broker has assumed responsibility for the message and the client may consider the message handled. Note that the broker may also negatively acknowledge a message by sending back a Basic.Nack. In this case, if a message is rejected via the IModel.BasicNacks event handler, the client should assume that the message was lost or otherwise undeliverable. Also, note that unroutable messages - messages published as mandatory to non-existing queues - are both Basic.Return'ed and Basic.Ack'ed.

Receiving Replies

AMQP 0-9-1 content header (IBasicProperties) contains a field called ReplyTo, which can be used to tell the service where to post a reply to a received RPC request. Across current RabbitMQ client libraries, the most widely-used formats for the string in the ReplyTo header is a simple queue name, although passing an exchange name and routing key joined by application-specific rules is also an option. The service instance will post its reply to the named destination, and the requesting client should arrange to receive messages so addressed, using either BasicGet or BasicConsume on an appropriately-bound queue.

Correlating a received reply to a transmitted request

IBasicProperties contain a field called CorrelationId, which in AMQP 0-9-1 is an unstructured string that can be used to match a request to a reply. A reply message should have the same CorrelationId as the one that was attached to the request message.

Asynchronous, one-way messaging

In some situations, a simple request-reply interaction pattern is inappropriate for your application. In these cases, the interaction pattern of interest can be constructed from asynchronous, one-way, point-to-point messages. If an application is to respond to both synchronous, RPC-style requests, and asynchronous one-way requests, it should use the value of ReplyTo to decide which interaction style is being requested of it: if ReplyTo is present and non-empty, the request can be assumed to be an RPC-style call; otherwise, it should be assumed to be a one-way message. The CorrelationId field can be used to group together a number of related messages, just as for the RPC-style case, but more generally tying together an arbitrary number of messages.

Acknowledgment modes for point-to-point

AMQP can operate in one of two modes, when receiving messages from the server: auto-acknowledgement mode (when the noAck flag has been set on BasicGet, BasicConsume, or the Subscription constructor), or manual-acknowledgement mode. Choosing the right acknowledgement mode is important for your application:
  • auto-acknowledgement mode means that the server will internally mark a message as successfully delivered as it transmits it across the network. Messages delivered in auto-acknowledgement mode will not generally be redelivered to any other receiver.
  • manual-acknowledgement mode means that the server will wait for positive confirmation of receipt before marking a message as successfully delivered. Messages delivered in manual-acknowledgement mode will be re-queued if the channel (IModel) they were delivered on is closed before an acknowledgement is received by the server.
In general,
  • if a service is in manual-acknowledgement mode, it should not acknowledge the request message until it has replied to it; see the section below on interaction with external resources.
  • a client may use auto-acknowledgement mode, depending on the consequences of a retransmission of the request message.

Library support for point-to-point messaging

The RabbitMQ .NET client library includes basic support for common tasks involving point-to-point messaging.

SimpleRpcServer

The class RabbitMQ.Client.MessagePatterns.SimpleRpcServer implements synchronous RPC-style request handling as well as asynchronous message handling. Users should subclass SimpleRpcServer, overriding one or more of the methods with names beginning with "Handle". SimpleRpcServer instances have a request-dispatching loop, MainLoop, which interprets a request as an RPC-style request needing a reply if the ReplyTo field of the request's IBasicProperties is non-null and non-empty. Requests with absent or empty ReplyTo fields are treated as one-way. When an RPC-style request has been processed, the reply is sent to the ReplyTo address. The reply address is first matched against a regular-expression describing the URI-like syntax given above; if it matches, the components of the URI-like syntax are used as the reply address, and if it does not, the whole string is used as a simple queue name, and the reply is sent to the default exchange ("") with a routing-key equal to the ReplyTo string.

SimpleRpcClient

The class RabbitMQ.Client.MessagePatterns.SimpleRpcClient implements code for interacting with SimpleRpcServers or similar. RPC-style interactions are performed with the Call methods. A (private) Subscription is set up to receive replies from the service, and the ReplyTo field is set to point to the subscription. The CorrelationId field of the request is initialised to a fresh GUID. Asynchronous/one-way interactions are simply passed on to IModel.BasicPublish without modification: it is up to the caller to set CorrelationId in the asynchronous case. The class currently has no support for setting the "mandatory" flag on a published request message, nor for handling any BasicReturn events that might arise from setting that flag. The code that retrieves replies from the internal Subscription currently cannot deal with multiple simultaneously outstanding RPC requests, because it requires that replies arrive in the same order as the requests were sent out. Do not attempt to pipeline requests sent through a single instance of SimpleRpcClient until this restriction has been lifted. See also the overridable protected method SimpleRpcClient.RetrieveReply. The basic pattern for using SimpleRpcClient is as follows:
using (IConnection conn = new ConnectionFactory()
                                .CreateConnection(args[0])) {
    using (IModel ch = conn.CreateModel()) {

        SimpleRpcClient client = new SimpleRpcClient(ch, /* ... */);
        // in the line above, the "..." indicates the parameters
        // used to specify the address to use to route messages
        // to the service.

        // The next three lines are optional:
        client.TimeoutMilliseconds = 5000; // defaults to infinity
        client.TimedOut += new EventHandler(TimedOutHandler);
        client.Disconnected += new EventHandler(DisconnectedHandler);

        byte[] replyMessageBytes = client.Call(requestMessageBytes);
        // other useful overloads of Call() and Cast() are
        // available. See the code documentation of SimpleRpcClient
        // for full details.
    }
}
Note that a single SimpleRpcClient instance can perform many (sequential) Call() and Cast() requests! It is recommended that a single SimpleRpcClient be reused for multiple service requests, so long as the requests are strictly sequential.

Event Broadcasting

The event broadcasting pattern occurs when an application wishes to indicate a state change or other notification to a pool of applications without knowing precisely the addresses of each interested party. Applications interested in a certain subset of events use exchanges and queue-bindings to configure which events are routed to their own private queues.

Generally, events will be broadcast through topic exchanges, although direct exchanges, while less flexible, can sometimes perform better for applications where their limited pattern-matching capability is sufficient.

Publishing events

To publish an event, first ensure the exchange exists, then determine an appropriate routing key. For example, for stocks, a key such as `stock.ibm.nyse` might be appropriate; for other applications, other topic hierarchies will naturally arise. topic exchanges are commonly used. Then publish the message. For example:
using (IConnection conn = new ConnectionFactory()
                                .CreateConnection(args[0])) {
    using (IModel ch = conn.CreateModel()) {

        IBasicProperties props = ch.CreateBasicProperties();
        FillInHeaders(props); // or similar
        byte[] body = ComputeBody(props); // or similar

        ch.BasicPublish("exchangeName",
                        "chosen.routing.key",
                        props,
                        body);
    }
}
See the documentation for the various overloads of BasicPublish on class RabbitMQ.Client.IModel.

Subscription

The class RabbitMQ.Client.MessagePatterns.Subscription implements most of the boilerplate of receiving messages (including, in particular, broadcast events) for you, including consumer declaration and management, but excluding queue and exchange declaration and queue binding. For example,
// "IModel ch" in scope.
Subscription sub = new Subscription(ch, "STOCK.IBM.#");
foreach (BasicDeliverEventArgs e in sub) {
  // handle the message contained in e ...
  // ... and finally acknowledge it
  sub.Ack(e);
}
will start a consumer on the queue using IModel.BasicConsume. It is assumed that the queue and any bindings have been previously declared. Subscription.Ack() should be called for each received event, whether or not auto-acknowledgement mode is used, because Subscription internally knows whether an actual network message for acknowledgement is required, and will take care of it for you in an efficient way so long as Ack() is always called in your code. For full details, please see the code documentation for the Subscription class.

Retrieving events with a custom consumer

Sometimes the high-level approach using Subscription is sufficient. Other times, however, there is a need to use a custom consumer. This approach to retrieving events is to bind a queue to the exchange concerned with an appropriate routing-key pattern specification. For instance, assuming that our application wanted to retrieve all prices regarding IBM on queue "MyApplicationQueue":
// "IModel ch" in scope.
ch.ExchangeDeclare("prices", "topic");
ch.QueueDeclare("MyApplicationQueue", false, true, true, null);
ch.QueueBind("MyApplicationQueue", "prices",
             "STOCK.IBM.#", false, null);
... followed by consumption of messages from "MyApplicationQueue" using BasicGet or BasicConsume. A more full example is given in the ApiOverview chapter.

Acknowledgment modes for event broadcasting

The same auto-acknowledgement/manual-acknowledgement decision as for point-to-point messaging is available for consumers of broadcast events, but the pattern of interaction introduces different tradeoffs:

  • for high-volume messaging where it is occasionally acceptable to not receive one of the messages one is interested in, auto-acknowledgement mode makes sense
  • for scenarios where every message matching our subscription needs to be delivered, manual-acknowledgement is appropriate

For more information, see the section on reliable message transfer below. Note also that class Subscription takes care of acknowledgement and the various acknowledgement modes for you, so long as Subscription.Ack() is called for each received message.

Reliable message transfer

Messages can be transported between endpoints with different quality-of-service (QoS) levels. In general, failure cannot be completely ruled out, but it is important to understand the various delivery failure modes to understand the kinds of recovery from failure that are required, and the kinds of situation for which recovery is possible. To reiterate: it is not possible to completely rule out failure. The best that can be done is to narrow the conditions in which failure can occur, and to notify a system operator when failure is detected.

At-least-once delivery

This QoS level assures that a message is delivered to its ultimate destination at least once. That is, a receiver may receive multiple copies of the message. If it is important that a side-effect only occur once for a given message, at-most-once delivery should be used instead.

To implement at-least-once delivery:
  • publish the message as usual, with some correlation identifier and reply-to address on it so that the receiver can acknowledge receipt to the sender. when receiving a message, send an acknowledgement message back to the sender. If the message is an RPC request, then the RPC reply message is implicitly an acknowledgement of receipt of the request.
  • Alternatively, rather than implement the round-trip logic manually, a client may use Publisher Confirms. By enabling Confirm mode on a channel, a client is requesting that the broker acknowledge or negatively acknowledge all messages sent on the channel from that point on. See the instructions from Responsibility Transfer on how to use confirms.
Deciding on a message-resend policy can be difficult. Some simple resend strategies are:
  • resend if your connection is lost or some other crash occurs before you receive confirmation of receipt
  • timeout and resend if you do not receive a confirmation within a few seconds. Make sure to double the timeout for each resend, to help avoid retry-related denial-of-service and network congestion.

At-most-once delivery

For at-most-once delivery, simply publish the message, once, as usual. No correlation identifier is required. Receive the message in the consuming application, paying attention to the Redelivered flag on the delivery. The Redelivered flag will only be clear when the server believes that it is offering a message for consumption for the very first time. If any attempt at delivery has been made before, the Redelivered flag will be set. The Redelivered flag is a very limited piece of information, giving only at-most-once semantics.

Coding with multi-node RabbitMQ clusters

In situations where continuous service is desired, the possibility of a server failure can be hedged against with some careful programming and the availability of a warm-standby cluster for failover.

The main concerns when failing over are
  • atomicity of published/acknowledged work units, and
  • availability of configured resources on the backup server

Message producers should take care to use transactions in order to receive positive confirmation of receipt of a group of messages from a server, and should keep a record of the exchanges, queues and bindings they need to have available in order to perform their work, so that on failover, the appropriate resources can be declared before replaying the most recent transactions to recover.

Message consumers should be aware of the possibility of missing or duplicate messages when failing over: a publisher may decide to resend a transaction whose outcome is in doubt, or a transaction the publisher considered complete could disappear entirely due to failure of a cluster node.

Interacting with external resources

A common pattern for a service is to
  • receive a service request via a queue
  • update some external resource, such as a file or database
  • reply over RabbitMQ, or at a minimum, acknowledge to the server that the message triggering the action has been completed

Often elements of the at-least-once pattern appear in conjunction with the external-resource pattern - specifically, the side-effects discussed in the section on reliable message transfer above are often effects on an external resource.

In cases where a delivery must be processed no more than once and used in conjunction with an external resource, it's important to write code that is able at each step to determine whether the step has already been taken in some previous attempt at completing the whole transaction, and if it has, to be able to omit it in this attempt and proceed to the next step. For example:

  • If a work-triggering request went missing, another copy will (eventually) arrive from the ultimate requestor.
  • If the work was already performed, for instance a database table was updated, in a previous receipt of the work item in question, the service needs to keep a record of completion of the external work that is atomic with respect to the atomic work itself: for instance, within the same database transaction, some log of honoured requests could be updated, or the row being modified could be updated to include the ID of the request that caused the modification, as well as previous request-IDs that modified the row in question.

This makes it important to be able to compress request IDs so that they do not take unbounded space in the log of performed work, and so that we do not need to introduce a full distributed garbage-collection protocol with the ultimate requestor. One way of doing this is to choose to use request IDs that are strictly increasing, so that a "high water mark" can be used. Once the work is known to have been performed, and a reply has been produced (if there is one), the reply can be sent back to the requestor as many times as necessary. The requestor knows which replies it is expecting, and can discard unwanted duplicates. So long as duplicates of the same request always receive identical reply messages, the replier need not be careful about sending too many copies of the reply. Once the reply has been sent to the server, the request message can be acknowledged as received and processed with the server server. In cases where there is no reply to a request, the acknowledgement is still useful to ensure that requests are not lost.

A RabbitMQ Binding for WCF

The Windows Communication Foundation (WCF) enabled protocol independent service oriented applications to be built; RabbitMQ .NET client extends the framework by providing a Binding and Transport Binding Element over RabbitMQ. In the language of WCF, a Binding is a stack of Binding Elements which control all aspects of the service’s communication (for example, Security, Message Format and Transactions). A specialized kind of Binding Element, the Transport Binding Element specifies the protocol to be used for communication between a service and its clients (for example WS-HTTP, MSMQ or .Net Remoting over TCP).

The RabbitMQ Binding provides OneWay ("Fire and Forget"), TwoWay (Request/Reply) and Duplex (Asynchronous Callback) communication over RabbitMQ with WS-ReliableSessions, WS-AtomicTransactions and Text message encoding. The binding can be configured from imperative code or using the standard WCF Configuration model.

A Transport Binding Element is also supplied and can be used in the construction of custom bindings if the channel stack provided by the RabbitMQ Binding is insufficient. The transport binding must be configured with a broker hostname and broker port prior to use.

Status and Known Limitations

  1. A TwoWay or Duplex service cannot have SessionMode = SessionMode.NotAllowed since a Reliable Session is required to maintain the reply channel.
  2. Only SOAP Formatting is available, other formatters can be specified by building a CustomBinding on top of the RabbitMQTransportBindingElement
  3. Service queue parameters (e.g. durability) are not configurable
  4. Network faults are not reported to the binding

The RabbitMQ WCF binding has limited flexibility compared to the RabbitMQ .NET RabbitMQ client library. You are advised to use the .NET RabbitMQ client library if you require greater flexibility (e.g. control over durability of service queue) or if you require long-term support. The WCF binding is not under active development.

Building the Binding and Samples

The RabbitMQ binding to WCF and associated samples can be built automatically using Nant. For more information about Nant, visit http://nant.sourceforge.net/. To build the library and Sample Applications from a console window, change to the RabbitMQ.net drop location and execute:

nant build-wcf
nant wcf-examples

  • src\wcf\RabbitMQ.ServiceModel\RabbitMQ.ServiceModel.csproj
  • src\wcf\Test\RabbitMQ.ServiceModel.Test.csproj
The WCF Binding is built into the RabbitMQ.ServiceModel.dll assembly and copied to the bin directory of the RabbitMQ.ServiceModel project and the sample applications are built into the bin directory of the Test project. To run the sample applications (verifying the build and your environment configuration) execute the RabbitMQ.ServiceModel.Test.exe application. By default, the sample applications use a test broker which must be running at localhost. You can modify the broker hostname and port by opening and editing the appSettings section of the Application Configuration file (App.Config) for the Test Project.

The ABCs of WCF

Each Windows Communication Foundation service is built from three components, an Address, Behaviours and a Contract. For more information, see Windows Communication Foundation Essentials.

Contract

A service contract is an interface decorated with the ServiceContractAttribute and has one or more methods (or property accessors) decorated with the OperationContract attribute. Typically the contract exists in an assembly that can be shared between client and server applications.

[ServiceContract]
public interface ICalculator
{
  [OperationContract]
  int Add(int x, int y);

  [OperationContract]
  int Subtract(int x, int y);
}

Behaviour

The contract for a service specifies what the operations the service agrees to provide, the behaviour specifies the implementation for that service. A behaviour is a class implementing the contract and optionally decorated with the ServiceBehavior attribute.

[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)]
public sealed class Calculator : ICalculator
{
  public int Add(int x, int y)
  {
    return x + y;
  }

  public int Subtract(int x, int y)
  {
    return x - y;
  }
}

Address

For a service to be useful, it must be reachable and therefore hosted. The two common hosting scenarios for WCF services are IIS and ServiceHost. IIS Hosting is untested and unsupported by the RabbitMQ binding and using System.ServiceModel.ServiceHost is the recommended hosting path. A service host instance is constructed with the type of service behaviour being hosted and a description of the endpoint(s) it will be published on. The endpoints consist of Addresses (e.g. soap.amp:///MyService) and Bindings; they may be specified directly as constructor arguments in imperative code or declaratively through WCF configuration files, both are supported by the RabbitMQ binding.

Service Addressing

Services hosted using the RabbitMQ binding must be hosted at addresses under the soap.amqp scheme. The amq.direct exchange is used. The service name must not be omitted.

serviceAddress = “soap.amqp:///” serviceName

Sample Services

The sample services referred to in this section are located in the src\wcf\Test project.

One Way Services

Operations on a service can be marked as One Way; this means there will be no response from the service when the operation completes. One Way operations always have return type void and have an OperationContract attribute with IsOneWay set equal to true decorating them.

[OperationContract(IsOneWay=true)]
void Log(LogData entry);

If a service only contains one way operations the RabbitMQ binding can be used in an optimized OneWayOnly mode. In this mode, no reply queue is created for responses to be sent back to the client and the client does not listen for responses from the service. To enable OneWayOnly mode set the binding property or use the oneWay configuration attribute.

<rabbitMQBinding>
  <binding name="rabbitMQConfig"
           hostame="localhost"
           port="5672"
           username="guest"
           password="guest"
           virtualHost="/"
           oneWay="true"
           maxmessagesize="8192" />
</rabbitMQBinding>

The OneWayTest sample application is a simple logging service. Clients submit log entries to a server which displays them on the console. It demonstrates one way RPC over RabbitMQ, SOAP encoding to transmit complex data types over the wire and singleton instance context mode.

Two Way Services

Typically a service operates in a bi-directional, two way fashion where requests from the client are synchronously executed and a response returned to the caller. To support these services, the RabbitMQ binding uses the CompositeDuplexBindingElement , which constructs a uniquely named reply queue on the broker. Two Way services are not supported by the binding when it is in OneWayOnly mode. The TwoWayTest sample application is a calculator service, whose operations take a pair of integers and return a third.

Sessionful Services

Each call to a service can be considered independent of all others with the service maintaining no state, often a more useful service maintains some state between calls. The RabbitMQ binding supports WS-ReliableSessions enable the object instances used to service requests to have a session-long lifetime and be associated with a single client session. The SessionTest sample application is a cart service, allowing items to be added to a cart and a total calculated.

Duplex Services

A call to a two way service might start a long running process (for example, aggregating prices from a list of suppliers) and whilst the client requires a response, it is desirable that the client is not blocked for the duration of the call; instead, an asynchronous call is desired. Duplex services allow the service to make calls to the client, and have a contract whose ServiceContract specifies a CallbackContract type.

[ServiceContract(CallbackContract=typeof(IOrderCallback))]
public interface IOrderService

Duplex services are supported by the RabbitMQ binding because its channel stack includes the composite duplex binding element, they are not supported in OneWayOne mode. The DuplexTest sample application is an ordering service, which makes a callback to the client when an order is fulfilled.

Using the RabbitMQ Binding

Services

The recommended hosting scenario for services over RabbitMQ is self hosting using System.ServiceModel.ServiceHost. The ServiceHost must specify a base or absolute endpoint address under the soap.amqp scheme. An endpoint should then be added to the service using the RabbitMQBinding.

var service = new ServiceHost(typeof(Calculator), new Uri("soap.amqp:///"));
var binding = new RabbitMQBinding("localhost", 5672, "guest", "guest", "/",
                                  8192, Protocols.AMQP_0_9_1);

service.AddServiceEndpoint(typeof(ICalculator), binding, "Calculator");

Clients

The recommended pattern for connecting to a service is by deriving from either ClientBase<T> or DuplexClientBase<T>. For duplex clients, the InstanceContext must be specified.

Configuration Files

Specifying details like the protocol version and broker address in source code tends to result in services which are very hard to manage and deploy. To avoid this, WCF provides a configuration mechanism using application configuration files (App.Config). The configuration file must be applied to the host or client assembly (typically an executable) and not to a library which contains the service contract or behaviours. To declaratively configure a service, the RabbitMQBindingSection must be imported into the system.serviceModel section of the configuration file.

<extensions>
  <bindingExtensions>
    <add
      name="rabbitMQBinding"
      type="RabbitMQ.ServiceModel.RabbitMQBindingSection, RabbitMQ.ServiceModel, Version=1.0.110.0, Culture=neutral, PublicKeyToken=null"/>
  </bindingExtensions>
</extensions>

With the extension imported, the rabbitMQBinding can be declared and configured:

<bindings>
  <rabbitMQBinding>
    <binding
      name="rabbitMQConfig"
      hostname="localhost"
      port="5672"
      maxmessagesize="8192"
      version="AMQP_0_9_1" />
  </rabbitMQBinding>
</bindings>

Service Configuration

A service is configured by declaring the contract, endpoint and binding. Multiple services and bindings can be specified in a single configuration file.

<services>
  <service name="Calculator">
    <host>
      <baseAddresses>
        <add baseAddress="soap.amq:///" />
      </baseAddresses>
    </host>
    <endpoint
      address="Calculator"
      binding="rabbitMQBinding"
      bindingConfiguration="rabbitMQConfig"
      contract="ICalculator"/>
  </service>
</services>

To run the service, simply create a new ServiceHost instance passing in the service behaviour (as specified in config).

host = new ServiceHost(typeof(Calculator));
host.Open();

Client Configuration

To build a client whose settings are derived from configuration, expose a constructor for your ClientBase<T> derived class calling the ClientBase(string).

public class CalculatorClient : ClientBase<ICalculator>, ICalculator
{

    public CalculatorClient(string configurationName)
        : base(configurationName) { }

Construct the class passing the client endpoint name as specified in configuration.

<client>
  <endpoint address="soap.amq:///Calculator"
            binding="rabbitMQBinding"
            bindingConfiguration="rabbitMQConfig"
            contract=" ICalculator"
            name="AMQPCalculatorService" />
</client>

The RabbitMQ WCF libraries also have full support for the WCF Configuration Editor Tool.