RabbitMQ


The RabbitMQ Server implements a number of extensions of the AMQP specification, which we are documenting here. There are also some experimental features, not directly related to the specification, which we consider to be useful but require further testing.

Queue Leases

The x-expires argument to queue.declare controls for how long a queue can be unused before it is automatically deleted. Unused means the queue has no consumers, and no consumer related operations (basic.get or basic.cancel) have occurred for a duration of at least the expiration period. This can be used, for example, for RPC-style reply queues, where many queues can be created which may never be drained.

The server guarantees that the queue will be deleted, if unused for at least the expiration period. No guarantee is given as to how promptly the queue will be removed after the expiration period has elapsed.

The value of the x-expires argument must be a long integer, greater than zero, describing the expiration period in milliseconds. Thus a value of 1000L means a queue which is unused for 1 second will be deleted.

Usage

This example in Java creates a queue which expires after it has been unused for 30 minutes.

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000L);
channel.queueDeclare("myqueue", false, false, false, args);

Alternate Exchanges

It is sometimes desirable to let clients handle messages that an exchange was unable to route (i.e. either because there were no bound queues our no matching bindings). Typical examples of this are

  • detecting when clients accidentally or maliciously publish messages that cannot be routed
  • "or else" routing semantics where some messages are handled specially and the rest by a generic handler

RabbitMQ's Alternate Exchange ("AE") feature addresses these use cases.

Configuration

When creating an exchange the name of an AE can be optionally supplied in the exchange.declare method's arguments table by specifying a key of 'alternate-exchange' and a value of type 'S' (string) containing the name.

When an AE has been specified, in addition to the usual configure permissions on the declared exchange, the user needs to have read permissions on that exchange and write permissions on the AE.

For example:

Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "my-ae");
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout");
channel.queueDeclare("routed");
channel.queueBind("routed", "my-direct", "key1");
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");

In the above fragment of Java code we create a direct exchange 'my-direct' that is configured with an AE called 'my-ae'. The latter is declared as a fanout exchange. We bind one queue 'routed' to 'my-direct' with a binding key of 'key1', and a queue 'unrouted' to 'my-ae'.

Operation

Whenever an exchange with a configured AE cannot route a message to any queue (or the message was marked as immediate and it cannot be delivered to any consumer) then it publishes the message to the specified AE instead. If that AE does not exist then a warning is logged.

For example:

channel.basicPublish("my-direct", "key2", null, "test".getBytes());
GetResponse r2 = channel.basicGet("unrouted", true); //returns message
channel.basicPublish("my-direct", "key1", null, "test".getBytes());
GetResponse r1 = channel.basicGet("routed", true); //returns message

Here we first publish a message to 'my-direct' with a routing key of 'key1'. That message is routed to the 'routed' queue, in accordance with the standard AMQP behaviour. However, when publishing a message to 'my-direct' with a routing key of 'key2', rather than being discarded the message is routed via our configured AE to the 'unrouted' queue.

When an AE cannot route a message, it in turn publishes the message to its AE, if it has one configured. This process continues until either the message is successfully routed, the end of the chain of AEs is reached, or an AE is encountered which has already attempted to route the message.

The semantics of the 'mandatory' and 'immediate' flags carries through to AEs. So, for example, if a sender specified the 'mandatory' flag on publication of a message, and that message could not be routed by the original exchange, but was then routed to a queue via an AE, then no basic.return is issued.

Message properties are carried through when messages are routed via AEs. For example, when a message that was marked as persistent is routed to some queue via an AE then it does get persisted.

The queues to which a messages is routed from an AE participate in AMQP's tx transactions, i.e. if an unroutable message was published inside a tx then it will only appear in the queues when that tx is committed.

Memory-based flow control

The RabbitMQ server detects the total amount of RAM installed in the computer on startup. By default, when the RabbitMQ server uses above 40% of the installed RAM, it raises a memory alarm. In response the server pauses reading from the sockets of connected clients which send content-bearing methods (such as basic.publish) after the alarm was raised. Connection heartbeat monitoring gets disabled too. Once the memory alarm has cleared (e.g. due to the server paging messages to disk or delivering them to clients) normal service resumes.

The intent here is to introduce a flow control mechanism that throttles producers but lets consumers continue unaffected. However, since AMQP permits producers and consumers to operate on the same channel, and on different channels of a single connection, this logic is necessarily imperfect. In practice that does not pose any problems for most applications since the throttling is observable merely as a delay. Nevertheless, other design considerations permitting, it is advisable to only use individual AMQP connections for either producing or consuming.

The default memory threshold is set to 40% of installed RAM. Note that this does not prevent the RabbitMQ server from using more than 40%, it is merely the point at which publishers are throttled. Erlang's garbage collector can, in the worst case, cause double the amount of memory to be used (by default, 80% of RAM), and non-compliant AMQP clients can cause the RabbitMQ server to use more still. It is strongly recommended that OS swap or page files are enabled.

32-bit architectures impose a per process memory limit of 4GB, though under Windows, this is frequently further reduced to 2GB. Common implementations of 64-bit architectures (i.e. AMD64 and Intel EM64T) permit only a paltry 256TB per process. 64-bit Windows again further limits this to 8TB. However, note that even under 64-bit Windows, a 32-bit process frequently only has a maximum address space of 2GB.

Usage

The memory threshold at which the flow control is triggered can be adjusted by editing the rabbitmq.config file (in the appropriate location for your platform, as discussed in the installation guide). The example below sets the threshold to the default value of 0.4:

[{rabbit, [{vm_memory_high_watermark, 0.4}]}].

The default value of 0.4 stands for 40% of installed RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit Windows platform, if you have 4GB of RAM installed, 40% of 4GB is 1.6GB, but 32-bit Windows normally limits processes to 2GB, so the threshold is actually to 40% of 2GB (which is 820MB).

A value of 0 disables the memory monitor and throttling of producers via channel.flow.

The memory limit is appended to the RABBITMQ_NODENAME.log file when the RabbitMQ server starts:

=INFO REPORT==== 29-Oct-2009::15:43:27 ===
Memory limit set to 2048MB.

Versions of Erlang prior to R13B had a bug that meant they do not reliably detect the amount of memory installed on Windows platforms. If RabbitMQ is running under Windows on a version of Erlang prior to R13B, then it does not attempt to automatically set a threshold. Instead, it behaves as if it's unable to recognise the platform: see below.

Unrecognised platforms

If the RabbitMQ server is unable to recognise your system, or if you're using Windows and a version of Erlang prior to R13B, it will append a warning to the RABBITMQ_NODENAME.log file. It then assumes than 1GB of RAM is installed:

=WARNING REPORT==== 29-Oct-2009::17:23:44 ===
Unknown total memory size for your OS {unix,magic_homebrew_os}. Assuming memory size is 1024MB.

In this case, the vm_memory_high_watermark configuration value is used to scale the assumed 1GB RAM. With the default value of vm_memory_high_watermark set to 0.4, RabbitMQ's memory threshold is set to 410MB, thus it will throttle producers whenever RabbitMQ is using more than 410MB memory. Thus when RabbitMQ can't recognize your platform, if you actually have 8GB RAM installed and you want to invoke channel.flow to throttle producers when the RabbitMQ server is using above 3GB, set vm_memory_high_watermark to 3.

It is advised you do not set the threshold above 50% of your installed RAM, and to note that under Windows, it is commonly the case that a maximum of 2GB memory is available to RabbitMQ.