
The RabbitMQ Server implements a number of extensions of the AMQP specification, which we are documenting here. There are also some experimental features, not directly related to the specification, which we consider to be useful but require further testing.
The x-expires argument to
queue.declare controls for how long a queue can
be unused before it is automatically
deleted. Unused means the queue has no consumers,
and no consumer related operations (basic.get or
basic.cancel) have occurred for
a duration of at least the expiration period. This can be
used, for example, for RPC-style reply queues, where many
queues can be created which may never be drained.
The server guarantees that the queue will be deleted, if unused for at least the expiration period. No guarantee is given as to how promptly the queue will be removed after the expiration period has elapsed.
The value of the x-expires argument must be a
long integer, greater than zero, describing the
expiration period in milliseconds. Thus a value of 1000L means
a queue which is unused for 1 second will be deleted.
This example in Java creates a queue which expires after it has been unused for 30 minutes.
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000L);
channel.queueDeclare("myqueue", false, false, false, args);
It is sometimes desirable to let clients handle messages that an exchange was unable to route (i.e. either because there were no bound queues our no matching bindings). Typical examples of this are
RabbitMQ's Alternate Exchange ("AE") feature addresses these use cases.
When creating an exchange the name of an AE can be
optionally supplied in the exchange.declare
method's arguments table by specifying a key
of 'alternate-exchange' and a value of type 'S' (string)
containing the name.
When an AE has been specified, in addition to the usual configure permissions on the declared exchange, the user needs to have read permissions on that exchange and write permissions on the AE.
For example:
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "my-ae");
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout");
channel.queueDeclare("routed");
channel.queueBind("routed", "my-direct", "key1");
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");
In the above fragment of Java code we create a direct exchange 'my-direct' that is configured with an AE called 'my-ae'. The latter is declared as a fanout exchange. We bind one queue 'routed' to 'my-direct' with a binding key of 'key1', and a queue 'unrouted' to 'my-ae'.
Whenever an exchange with a configured AE cannot route a message to any queue (or the message was marked as immediate and it cannot be delivered to any consumer) then it publishes the message to the specified AE instead. If that AE does not exist then a warning is logged.
For example:
channel.basicPublish("my-direct", "key2", null, "test".getBytes());
GetResponse r2 = channel.basicGet("unrouted", true); //returns message
channel.basicPublish("my-direct", "key1", null, "test".getBytes());
GetResponse r1 = channel.basicGet("routed", true); //returns message
Here we first publish a message to 'my-direct' with a routing key of 'key1'. That message is routed to the 'routed' queue, in accordance with the standard AMQP behaviour. However, when publishing a message to 'my-direct' with a routing key of 'key2', rather than being discarded the message is routed via our configured AE to the 'unrouted' queue.
When an AE cannot route a message, it in turn publishes the message to its AE, if it has one configured. This process continues until either the message is successfully routed, the end of the chain of AEs is reached, or an AE is encountered which has already attempted to route the message.
The semantics of the 'mandatory' and 'immediate' flags
carries through to AEs. So, for example, if a sender
specified the 'mandatory' flag on publication of a
message, and that message could not be routed by the
original exchange, but was then routed to a queue via an
AE, then no basic.return is issued.
Message properties are carried through when messages are routed via AEs. For example, when a message that was marked as persistent is routed to some queue via an AE then it does get persisted.
The queues to which a messages is routed from an AE
participate in AMQP's tx transactions,
i.e. if an unroutable message was published inside a tx
then it will only appear in the queues when that tx is
committed.
The RabbitMQ server detects the total amount of RAM
installed in the computer on startup. By default, when the
RabbitMQ server uses above 40% of the installed RAM, it
raises a memory alarm. In response the server pauses reading
from the sockets of connected clients which send
content-bearing methods (such as basic.publish)
after the alarm was raised. Connection heartbeat monitoring
gets disabled too. Once the memory alarm has cleared
(e.g. due to the server paging messages to disk or
delivering them to clients) normal service resumes.
The intent here is to introduce a flow control mechanism that throttles producers but lets consumers continue unaffected. However, since AMQP permits producers and consumers to operate on the same channel, and on different channels of a single connection, this logic is necessarily imperfect. In practice that does not pose any problems for most applications since the throttling is observable merely as a delay. Nevertheless, other design considerations permitting, it is advisable to only use individual AMQP connections for either producing or consuming.
The default memory threshold is set to 40% of installed RAM. Note that this does not prevent the RabbitMQ server from using more than 40%, it is merely the point at which publishers are throttled. Erlang's garbage collector can, in the worst case, cause double the amount of memory to be used (by default, 80% of RAM), and non-compliant AMQP clients can cause the RabbitMQ server to use more still. It is strongly recommended that OS swap or page files are enabled.
32-bit architectures impose a per process memory limit of 4GB, though under Windows, this is frequently further reduced to 2GB. Common implementations of 64-bit architectures (i.e. AMD64 and Intel EM64T) permit only a paltry 256TB per process. 64-bit Windows again further limits this to 8TB. However, note that even under 64-bit Windows, a 32-bit process frequently only has a maximum address space of 2GB.
The memory threshold at which the flow control is
triggered can be adjusted by editing the rabbitmq.config
file (in the appropriate location for your platform, as
discussed in the installation
guide). The example below sets the threshold to the
default value of 0.4:
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
The default value of 0.4 stands for 40% of installed RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit Windows platform, if you have 4GB of RAM installed, 40% of 4GB is 1.6GB, but 32-bit Windows normally limits processes to 2GB, so the threshold is actually to 40% of 2GB (which is 820MB).
A value of 0 disables the memory monitor and throttling of
producers via channel.flow.
The memory limit is appended to the RABBITMQ_NODENAME.log file when the RabbitMQ server starts:
=INFO REPORT==== 29-Oct-2009::15:43:27 === Memory limit set to 2048MB.
Versions of Erlang prior to R13B had a bug that meant they do not reliably detect the amount of memory installed on Windows platforms. If RabbitMQ is running under Windows on a version of Erlang prior to R13B, then it does not attempt to automatically set a threshold. Instead, it behaves as if it's unable to recognise the platform: see below.
If the RabbitMQ server is unable to recognise your system, or if you're using Windows and a version of Erlang prior to R13B, it will append a warning to the RABBITMQ_NODENAME.log file. It then assumes than 1GB of RAM is installed:
=WARNING REPORT==== 29-Oct-2009::17:23:44 ===
Unknown total memory size for your OS {unix,magic_homebrew_os}. Assuming memory size is 1024MB.
In this case, the vm_memory_high_watermark
configuration value is used to scale the assumed 1GB
RAM. With the default value of
vm_memory_high_watermark set to 0.4,
RabbitMQ's memory threshold is set to 410MB, thus it will
throttle producers whenever RabbitMQ is using more than
410MB memory. Thus when RabbitMQ can't recognize your
platform, if you actually have 8GB RAM installed and you
want to invoke channel.flow to throttle
producers when the RabbitMQ server is using above 3GB, set
vm_memory_high_watermark to 3.
It is advised you do not set the threshold above 50% of your installed RAM, and to note that under Windows, it is commonly the case that a maximum of 2GB memory is available to RabbitMQ.