Menu

.NET/C# Client API Guide

Overview

This guide covers RabbitMQ .NET/C# client and its public API. It assumes that the most recent major version of the client is used and the reader is familiar with the basics.

Key sections of the guide are:

An API reference is available separately.

.NET Version Requirements

6.x release series of this library require .NET 4.6.1+ or a .NET Standard 2.0+ implementation. For 5.x releases, the requirements are .NET 4.5.1+ or a .NET Standard 1.5+ implementation.

License

The library is open source, developed on GitHub, and is double-licensed under the

This means that the user can consider the library to be licensed under any of the licenses from the list above. For example, the user may choose the Apache Public License 2.0 and include this client into a commercial product.

Dependencies

The client has a couple of dependencies:

Applications that use different versions of the same dependencies should use assembly version redirection, automatic or explicit.

Major namespaces, interfaces and classes

The client API is closely modelled on the AMQP 0-9-1 protocol model, with additional abstractions for ease of use.

An API reference is available separately.

The core API interfaces and classes are defined in the RabbitMQ.Client namespace:

using RabbitMQ.Client;

The core API interfaces and classes are

  • IModel: represents an AMQP 0-9-1 channel, and provides most of the operations (protocol methods)
  • IConnection: represents an AMQP 0-9-1 connection
  • ConnectionFactory: constructs IConnection instances
  • IBasicConsumer: represents a message consumer

Other useful interfaces and classes include:

  • DefaultBasicConsumer: commonly used base class for consumers

Public namespaces other than RabbitMQ.Client include:

  • RabbitMQ.Client.Events: various events and event handlers that are part of the client library, including EventingBasicConsumer, a consumer implementation built around C# event handlers.
  • RabbitMQ.Client.Exceptions: exceptions visible to the user.

All other namespaces are reserved for private implementation detail of the library, although members of private namespaces are usually made available to applications using the library in order to permit developers to implement workarounds for faults and gaps they discover in the library implementation. Applications cannot rely on any classes, interfaces, member variables etc. that appear within private namespaces remaining stable across releases of the library.

Connecting to RabbitMQ

Before an application can use RabbitMQ, it has to open a connection to a RabbitMQ node. The connection then will be used to perform all subsequent operations. Connections are meant to be long-lived. Opening a connection for every operation (e.g. publishing a message) would be very inefficient and is highly discouraged.

To open a connection with the .NET client, first instantiate a ConnectionFactory and configure it to use desired hostname, virtual host, credentials, TLS settings, and any other parameters as needed.

Then call the ConnectionFactory.CreateConnection() method to open a connection. Successful and unsuccessful client connection events can be observed in server logs.

The following two code snippets connect to a RabbitMQ node using a hostname configured using the hostName property:

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;

IConnection conn = factory.CreateConnection();
ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");

IConnection conn = factory.CreateConnection();

Using Lists of Endpoints

It is possible to specify a list of endpoints to use when connecting. The first reachable endpoint will be used. In case of connection failures, using a list of endpoints makes it possible for the application to connect to a different node if the original one is down.

To use multiple of endpoint, provide a list of AmqpTcpEndpoints to ConnectionFactory#CreateConnection. An AmqpTcpEndpoint represents a hostname and port pair.

ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "username";
factory.Password = "s3Kre7";

var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
  new AmqpTcpEndpoint("hostname"),
  new AmqpTcpEndpoint("localhost")
};
IConnection conn = factory.CreateConnection(endpoints);

Since the .NET client uses a stricter interpretation of the AMQP 0-9-1 URI spec than the other clients, care must be taken when using URIs. In particular, the host part must not be omitted and virtual hosts with empty names are not addressable.

All factory properties have default values. The default value for a property will be used if the property remains unassigned prior to creating a connection:

Property Default Value
Username "guest"
Password "guest"
Virtual host "/"
Hostname "localhost"
Port 5672 for regular ("plain TCP") connections, 5671 for connections with TLS enabled

Note that user guest can only connect from localhost by default. This is to limit well-known credential use in production systems.

The IConnection interface can then be used to open a channel:

IModel channel = conn.CreateModel();

The channel can now be used to send and receive messages, as described in subsequent sections.

Just like connections, channels are meant to be long-lived. Opening a new channel for every operation would be highly inefficient and is highly discouraged. Channels, however, can have a shorter life span than connections. For example, certain protocol errors will automatically close channels. If applications can recover from them, they can open a new channel and retry the operation.

This is covered in more detail in the Channel guide as well as other guides such as Consumer Acknowledgements.

Disconnecting from RabbitMQ

To disconnect, simply close the channel and the connection:

channel.Close();
conn.Close();

Disposing channel and connection objects is not enough, they must be explicitly closed with the API methods from the example above.

Note that closing the channel may be considered good practice, but isn’t strictly necessary here - it will be done automatically anyway when the underlying connection is closed.

Client disconnection events can be observed in server node logs.

Connection and Channel Lifespan

Connections are meant to be long-lived. The underlying protocol is designed and optimized for long running connections. That means that opening a new connection per operation, e.g. a message published, is unnecessary and strongly discouraged as it will introduce a lot of network roundtrips and overhead.

Channels are also meant to be long-lived but since many recoverable protocol errors will result in channel closure, channel lifespan could be shorter than that of its connection. Closing and opening new channels per operation is usually unnecessary but can be appropriate. When in doubt, consider reusing channels first.

Channel-level exceptions such as attempts to consume from a queue that does not exist will result in channel closure. A closed channel can no longer be used and will not receive any more events from the server (such as message deliveries). Channel-level exceptions will be logged by RabbitMQ and will initiate a shutdown sequence for the channel (see below).

Client-Provided Connection Name

RabbitMQ nodes have a limited amount of information about their clients:

  • their TCP endpoint (source IP address and port)
  • the credentials used

This information alone can make identifying applications and instances problematic, in particular when credentials can be shared and clients connect over a load balancer but Proxy protocol cannot be enabled.

To make it easier to identify clients in server logs and management UI, AMQP 0-9-1 client connections, including the RabbitMQ .NET client, can provide a custom identifier. If set, the identifier will be mentioned in log entries and management UI. The identifier is known as the client-provided connection name. The name can be used to identify an application or a specific component within an application. The name is optional; however, developers are strongly encouraged to provide one as it would significantly simplify certain operational tasks.

RabbitMQ .NET client provides a connection factory property, ConnectionFactory.ClientProvidedName, which, if set, controls the client-provided connection name for all new connections opened by this factory.

Here's a modified connection example used above which provides such a name:

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;

// this name will be shared by all connections instantiated by
// this factory
factory.ClientProvidedName = "app:audit component:event-consumer"

IConnection conn = factory.CreateConnection();

Using Exchanges and Queues

Client applications work with exchanges and queues, the high-level building blocks of the protocol. These must be "declared" before they can be used. Declaring either type of object simply ensures that one of that name exists, creating it if necessary.

Continuing the previous example, the following code declares an exchange and a queue, then binds them together.

channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, routingKey, null);

This will actively declare the following objects:

  • a non-durable, non-autodelete exchange of "direct" type
  • a non-durable, non-autodelete, non-exclusive queue

The exchange can be customised by using additional parameters. The above code then binds the queue to the exchange with the given routing key.

Many channel API (IModel) methods are overloaded. The convenient short form of ExchangeDeclare uses sensible defaults. There are also longer forms with more parameters, to let you override these defaults as necessary, giving full control where needed.

This "short version, long version" pattern is used throughout the API.

Passive Declaration

Queues and exchanges can be declared "passively". A passive declare simply checks that the entity with the provided name exists. If it does, the operation is a no-op. For queues successful passive declares will return the same information as non-passive ones, namely the number of consumers and messages in ready state in the queue.

If the entity does not exist, the operation fails with a channel level exception. The channel cannot be used after that. A new channel should be opened. It is common to use one-off (temporary) channels for passive declarations.

IModel#QueueDeclarePassive and IModel#ExchangeDeclarePassive are the methods used for passive declaration. The following example demonstrates IModel#QueueDeclarePassive:

var response = channel.QueueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.MessageCount;
// returns the number of consumers the queue has
response.ConsumerCount;

IModel#ExchangeDeclarePassive's return value contains no useful information. Therefore if the method returns and no channel exceptions occurs, it means that the exchange does exist.

Operations with Optional Responses

Some common operations also have a "no wait" version which won't wait for server response. For example, to declare a queue and instruct the server to not send any response, use

channel.QueueDeclareNoWait(queueName, true, false, false, null);

The "no wait" versions are more efficient but offer lower safety guarantees, e.g. they are more dependent on the heartbeat mechanism for detection of failed operations. When in doubt, start with the standard version. The "no wait" versions are only needed in scenarios with high topology (queue, binding) churn.

Deleting Entities and Purging Messages

A queue or exchange can be explicitly deleted:

channel.QueueDelete("queue-name", false, false);

It is possible to delete a queue only if it is empty:

channel.QueueDelete("queue-name", false, true);

or if it is not used (does not have any consumers):

channel.QueueDelete("queue-name", true, false);

A queue can be purged (all of its messages deleted):

channel.QueuePurge("queue-name")

Publishing Messages

To publish a message to an exchange, use IModel.BasicPublish as follows:

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
channel.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);

For fine control, you can use overloaded variants to specify the mandatory flag, or specify messages properties:

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);

This sends a message with delivery mode 2 (persistent) and content-type "text/plain". See the definition of the IBasicProperties interface for more information about the available message properties.

In the following example, we publish a message with custom headers:

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Headers = new Dictionary<string, object>();
props.Headers.Add("latitude",  51.5252949);
props.Headers.Add("longitude", -0.0905493);

channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);

Code sample below sets a message expiration:

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "36000000"

channel.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);

Retrieving Messages By Subscription ("push API")

The recommended and most convenient way to receive messages is to set up a subscription using the IBasicConsumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively.

One way to implement a consumer is to use the convenience class EventingBasicConsumer, which dispatches deliveries and other consumer lifecycle events as C# events:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body.ToArray();
                    // copy or deserialise the payload
                    // and process the message
                    // ...
                    channel.BasicAck(ea.DeliveryTag, false);
                };
// this consumer tag identifies the subscription
// when it has to be cancelled
String consumerTag = channel.BasicConsume(queueName, false, consumer);

Another option is to subclass DefaultBasicConsumer, overriding methods as necessary, or implement IBasicConsumer directly. You will generally want to implement the core method IBasicConsumer.HandleBasicDeliver.

More sophisticated consumers will need to implement further methods. In particular, HandleModelShutdown traps channel/connection closure. Consumers can also implement HandleBasicCancelOk to be notified of cancellations.

The ConsumerTag property of DefaultBasicConsumer can be used to retrieve the server-generated consumer tag, in cases where none was supplied to the original IModel.BasicConsume call.

You can cancel an active consumer with IModel.BasicCancel:

channel.BasicCancel(consumerTag);

When calling the API methods, you always refer to consumers by their consumer tags, which can be either client- or server-generated as explained in the AMQP 0-9-1 specification document.

Consumer Memory Safety Requirements

As of version 6.0 of the .NET client, message payloads are represented using the System.ReadOnlyMemory<byte> type from the System.Memory library.

This library places certain restrictions on when a read only memory span can be accessed by applications.

Important: consumer interface implementations must deserialize or copy delivery payload before delivery handler method returns. Retaining a reference to the payload is not safe: the memory allocated for it can be deallocated at any moment after the handler returns.

Async Consumer Implementations

The client provides an async-oriented consumer dispatch implementation. This dispatcher can only be used with async consumers, that is, IAsyncBasicConsumer implementations.

In order to use this dispatcher, set the ConnectionFactory.DispatchConsumersAsync property to true:

ConnectionFactory factory = new ConnectionFactory();
// ...
// use async-oriented consumer dispatcher. Only compatible with IAsyncBasicConsumer implementations
factory.DispatchConsumersAsync = true;

then register a consumer that implements IAsyncBasicConsumer, such as AsyncEventingBasicConsumer or AsyncDefaultBasicConsumer:

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (ch, ea) =>
    {
        var body = ea.Body.ToArray();
        // copy or deserialise the payload
        // and process the message
        // ...

        ch.BasicAck(ea.DeliveryTag, false);
        await Task.Yield();

    };
// this consumer tag identifies the subscription
// when it has to be cancelled
string tag = m.BasicConsume(queueName, false, consumer);
// ensure we get a delivery
bool waitRes = latch.WaitOne(2000);

Fetching Individual Messages (Polling or "pull API")

It is also possible to retrieve individual messages on demand ("pull API" a.k.a. polling). This approach to consumption is very inefficient as it is effectively polling and applications repeatedly have to ask for results even if the vast majority of the requests yield no results. Therefore using this approach is highly discouraged.

To "pull" a message, use the IModel.BasicGet method. The returned value is an instance of BasicGetResult, from which the header information (properties) and message body can be extracted:

bool autoAck = false;
BasicGetResult result = channel.BasicGet(queueName, autoAck);
if (result == null) {
    // No message available at this time.
} else {
    IBasicProperties props = result.BasicProperties;
    ReadOnlyMemory<byte> body = result.Body;
    ...

The above example uses manual acknowledgements (autoAck = false), so the application must also call IModel.BasicAck to acknowledge the delivery after processing:

    ...
    // acknowledge receipt of the message
    channel.BasicAck(result.DeliveryTag, false);
}

Note that fetching messages using this API is relatively inefficient. If you'd prefer RabbitMQ to push messages to the client, see the next section.

Concurrency Considerations for Consumers

There is a number of concurrency-related topics for a library user to consider.

Sharing Channels Between Threads

As a rule of thumb, IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances.

This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them.

If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion. One way of achieving this is for all users of an IModel to lock the instance itself:

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
}

Symptoms of incorrect serialisation of IModel operations include, but are not limited to,

  • connection-level exceptions due to invalid frame interleaving on the wire. RabbitMQ server logs will contain unexpected frame errors in such scenario.
  • Pipelining and continuation exceptions thrown by the client

Consumption that involve sharing a channel between threads should be avoided when possible but can be done safely.

Consumers that can be multi-threaded or use a thread pool internally, including TPL-based consumers, must use mutual exclusion of acknowledgements operations on a shared channel.

Per-Connection Thread Use

Each IConnection instance is, in the current implementation, backed by a single background thread that reads from the socket and dispatches the resulting events to the application. If heartbeats are enabled, they will use a pair of .NET timers per connection.

Usually, therefore, there will be at least two threads active in an application using this library:

the application thread
contains the application logic, and makes calls on IModel methods to perform protocol operations.
the I/O activity thread
hidden away and completely managed by the IConnection instance.

The one place where the nature of the threading model is visible to the application is in any callback the application registers with the library. Such callbacks include:

  • any IBasicConsumer method
  • the BasicReturn event on IModel
  • any of the various shutdown events on IConnection, IModel etc.

Consumer Callbacks and Ordering

As of version 3.5.0 application callback handlers can invoke blocking operations (such as IModel.QueueDeclare or IModel.BasicCancel). IBasicConsumer callbacks are invoked concurrently. However, per-channel operation order is preserved. In other words, if messages A and B were delivered in this order on the same channel, they will be processed in this order. If messages A and B were delivered on different channels, they can be processed in any order (or in parallel). Consumer callbacks are invoked in tasks dispatched a TaskScheduler.

Using a Custom Task Scheduler

It is possible to use a custom task scheduler by setting ConnectionFactory.TaskScheduler:

public class CustomTaskScheduler : TaskScheduler
{
  // ...
}

var cf = new ConnectionFactory();
cf.TaskScheduler = new CustomTaskScheduler();

This, for example, can be used to limit concurrency degree with a custom TaskScheduler.

Handling Unroutable Messages

If a message is published with the "mandatory" flag set, but cannot be delivered, the broker will return it to the sending client (via a basic.return AMQP 0-9-1 command).

To be notified of such returns, clients can subscribe to the IModel.BasicReturn event. If there are no listeners attached to the event, then returned messages will be silently dropped.

model.BasicReturn += new RabbitMQ.Client.Events.BasicReturnEventHandler(...);

The BasicReturn event will fire, for example, if the client publishes a message with the "mandatory" flag set to an exchange of "direct" type which is not bound to a queue.

Automatic Recovery From Network Failures

Connection Recovery

Network connection between clients and RabbitMQ nodes can fail. RabbitMQ .NET/C# client supports automatic recovery of connections and topology (queues, exchanges, bindings, and consumers). The feature has certain limitations covered later in this guide.

The automatic recovery process performs the following steps:

  • Reconnect
  • Restore connection listeners
  • Re-open channels
  • Restore channel listeners
  • Restore channel basic.qos setting, publisher confirms and transaction settings

Topology recovery starts after the above actions are completed. The following steps are performed for every channel known to being open at the time of connection failure:

  • Re-declare exchanges (except for predefined ones)
  • Re-declare queues
  • Recover all bindings
  • Recover all consumers

To enable automatic connection recovery, set ConnectionFactory.AutomaticRecoveryEnabled to true:

ConnectionFactory factory = new ConnectionFactory();
factory.AutomaticRecoveryEnabled = true;
// connection that will recover automatically
IConnection conn = factory.CreateConnection();

If recovery fails due to an exception (e.g. RabbitMQ node is still not reachable), it will be retried after a fixed time interval (default is 5 seconds). The interval can be configured:

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

When Will Connection Recovery Be Triggered?

Automatic connection recovery, if enabled, will be triggered by the following events:

  • An I/O exception is thrown in connection's I/O loop
  • A socket read operation times out
  • Missed server heartbeats are detected
  • Any other unexpected exception is thrown in connection's I/O loop

whichever happens first.

If initial client connection to a RabbitMQ node fails, automatic connection recovery won't kick in. Applications developers are responsible for retrying such connections, logging failed attempts, implementing a limit to the number of retries and so on. Here's a very basic example:

ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings

try {
  Connection conn = factory.CreateConnection();
} catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) {
  Thread.Sleep(5000);
  // apply retry logic
}

When a connection is closed by the application via the Connection.Close method, connection recovery will not be initiated.

Channel-level exceptions will not trigger any kind of recovery as they usually indicate a semantic issue in the application (e.g. an attempt to consume from a non-existent queue).

Effects on Publishing

Messages that are published using IModel.BasicPublish when connection is down will be lost. The client does not enqueue them for delivery after connection has recovered. To ensure that published messages reach RabbitMQ applications need to use Publisher Confirms and account for connection failures.

Topology Recovery

Topology recovery involves recovery of exchanges, queues, bindings and consumers. It is enabled by default but can be disabled:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.CreateConnection();
factory.AutomaticRecoveryEnabled = true;
factory.TopologyRecoveryEnabled  = false;

Failure Detection and Recovery Limitations

Automatic connection recovery has a number of limitations and intentional design decisions that applications developers need to be aware of.

When a connection is down or lost, it takes time to detect. Therefore there is a window of time in which both the library and the application are unaware of effective connection failure. Any messages published during this time frame are serialised and written to the TCP socket as usual. Their delivery to the broker can only be guaranteed via publisher confirms: publishing in AMQP 0-9-1 is entirely asynchronous by design.

When a socket or I/O operation error is detected by a connection with automatic recovery enabled, recovery begins after a configurable delay, 5 seconds by default. This design assumes that even though a lot of network failures are transient and generally short lived, they do not go away in an instant. Connection recovery attempts will continue at identical time intervals until a new connection is successfully opened.

When a connection is in the recovering state, any publishes attempted on its channels will be rejected with an exception. The client currently does not perform any internal buffering of such outgoing messages. It is an application developer's responsibility to keep track of such messages and republish them when recovery succeeds. Publisher confirms is a protocol extension that should be used by publishers that cannot afford message loss.

Connection recovery will not kick in when a channel is closed due to a channel-level exception. Such exceptions often indicate application-level issues. The library cannot make an informed decision about when that's the case.

Closed channels won't be recovered even after connection recovery kicks in. This includes both explicitly closed channels and the channel-level exception case above.

Manual Acknowledgements and Automatic Recovery

When manual acknowledgements are used, it is possible that network connection to RabbitMQ node fails between message delivery and acknowledgement. After connection recovery, RabbitMQ will reset delivery tags on all channels.

This means that basic.ack, basic.nack, and basic.reject with old delivery tags will cause a channel exception. To avoid this, RabbitMQ .NET client keeps track of and updates delivery tags to make them monotonically growing between recoveries.

IModel.BasicAck, IModel.BasicNack, and IModel.BasicReject then translate adjusted delivery tags into those used by RabbitMQ.

Acknowledgements with stale delivery tags will not be sent. Applications that use manual acknowledgements and automatic recovery must be capable of handling redeliveries.

Getting Help and Providing Feedback

If you have questions about the contents of this guide or any other topic related to RabbitMQ, don't hesitate to ask them on the RabbitMQ mailing list.

Help Us Improve the Docs <3

If you'd like to contribute an improvement to the site, its source is available on GitHub. Simply fork the repository and submit a pull request. Thank you!