Menu

Highly Available (Mirrored) Queues

By default, queues within a RabbitMQ cluster are located on a single node (the node on which they were first declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. Each mirrored queue consists of one master and one or more mirrors, with the oldest mirror being promoted to the new master if the old master disappears for any reason.

Messages published to the queue are replicated to all mirrors. Consumers are connected to the master regardless of which node they connect to, with mirrors dropping messages that have been acknowledged at the master. Queue mirroring therefore enhances availability, but does not distribute load across nodes (all participating nodes each do all the work).

This solution requires a RabbitMQ cluster, which means that it will not cope seamlessly with network partitions within the cluster and, for that reason, is not recommended for use across a WAN (though of course, clients can still connect from as near and as far as needed).

There are multiple terms commonly used to identify primary and secondary replicas in a distributed system. This guide typically uses "master" to refer to the primary replica of a queue and "mirror" for secondary replicas. However, you will find "slave" used here in there. This is because RabbitMQ CLI tools historically have been using the term "slave" to refer to secondaries. Therefore both terms are currently used interchangeably but we'd like to eventually get rid of the legacy terminology.

How Mirroring is Configured

Before we provide an example of how to enable mirroring we should take a look at how it had been done historically and what is the recommended way today.

In addition to mandatory properties (e.g. durable or exclusive), queues in RabbitMQ have optional parameters (arguments), sometimes referred to as x-arguments. Those are provided by clients when they declare queues and control various optional queue features, such as mirroring or TTL. The x-arguments can be used to configure mirroring parameters but there's a better way.

A more flexible, unobtrisuve, and manageable way of configuring x-arguments is via policies. A policy matches one or more queues by name (using a regular expression pattern) and appends its definition (a map of optional arguments) to the x-arguments of the matching queues. In other words, it is possible to configure x-arguments for multiple queues at once with a policy, and update them all at once by updating policy definition. This is the most common and recommended way of configuring mirroring in modern RabbitMQ versions. Please see documentation on policies for more information.

Queue Arguments that Control Mirroring

As we've covered above, queues typically have mirroring enabled via policy. Policies can change at any time; it is valid to create a non-mirrored queue, and then make it mirrored at some later point (and vice versa). There is a difference between a non-mirrored queue and a mirrored queue which does not have any mirrors - the former lacks the extra mirroring infrastructure and will likely provide higher throughput.

You should be aware of the behaviour of adding mirrors to a queue.

To cause queues to become mirrored, you should create a policy which matches them and sets policy keys ha-mode and (optionally) ha-params. The following table explains the options for these keys:

ha-mode ha-params Result
all (absent) Queue is mirrored across all nodes in the cluster. When a new node is added to the cluster, the queue will be mirrored to that node.
exactly count Queue is mirrored to count nodes in the cluster. If there are less than count nodes in the cluster, the queue is mirrored to all nodes. If there are more than count nodes in the cluster, and a node containing a mirror goes down, then a new mirror will be created on another node. Use of exactly mode with "ha-promote-on-shutdown": "always" can be dangerous since queues can migrate across a cluster and become unsynced as it is brought down.
nodes node names Queue is mirrored to the nodes listed in node names. Node names are the Erlang node names as they appear in rabbitmqctl cluster_status; they usually have the form "rabbit@hostname". If any of those node names are not a part of the cluster, this does not constitute an error. If none of the nodes in the list are online at the time when the queue is declared then the queue will be created on the node that the declaring client is connected to.

Whenever the HA policy for a queue changes it will endeavour to keep its existing mirrors as far as this fits with the new policy.

To How Many Nodes to Mirror?

Note that mirroring to all queues is the most conservative option and is unnecessary in many cases. For clusters of 3 and more nodes it is recommended to mirror to a quorum (the majority) of nodes, e.g. 2 nodes in a 3 node cluster or 3 nodes in a 5 node cluster. Since some data can be inherently transient or very time sensitive, it can be perfectly reasonable to use a lower number of mirrors for some queues (or even not use any mirroring).

Queue Masters, Master Migration, Data Locality

Queue Master Location

Every queue in RabbitMQ has a home node. That node is called queue master. All queue operations go through the master first and then are replicated to mirrors. This is necessary to guarantee FIFO ordering of messages.

Queue masters can be distributed between nodes using several strategies. Which strategy is used is controlled in three ways, namely, using the x-queue-master-locator queue declare argument, setting the queue-master-locator policy key or by defining the queue_master_locator key in the configuration file. Here are the possible strategies and how to set them:

  • Pick the node hosting the minimum number of masters: min-masters
  • Pick the node the client that declares the queue is connected to: client-local
  • Pick a random node: random

"nodes" Policy and Migrating Masters

Note that setting or modifying a "nodes" policy can cause the existing master to go away if it is not listed in the new policy. In order to prevent message loss, RabbitMQ will keep the existing master around until at least one other mirror has synchronised (even if this is a long time). However, once synchronisation has occurred things will proceed just as if the node had failed: consumers will be disconnected from the master and will need to reconnect.

For example, if a queue is on [A B] (with A the master), and you give it a nodes policy telling it to be on [C D], it will initially end up on [A C D]. As soon as the queue synchronises on its new mirrors [C D], the master on A will shut down.

Exclusive Queues

Exclusive queues will be deleted when the connection that declared them is closed. For this reason, it is not useful for an exclusive queue to be mirrored (or durable for that matter) since when the node hosting it goes down, the connection will close and the queue will need to be deleted anyway.

For this reason, exclusive queues are never mirrored (even if they match a policy stating that they should be). They are also never durable (even if declared as such).

Examples

The following example declares a policy named ha-all which matches the queues whose names begin with "ha." and configures mirroring to all nodes in the cluster (see To How Many Nodes to Mirror? above):

rabbitmqctl
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
rabbitmqctl (Windows)
rabbitmqctl set_policy ha-all "^ha\." "{""ha-mode"":""all""}"
HTTP API
PUT /api/policies/%2f/ha-all {"pattern":"^ha\.", "definition":{"ha-mode":"all"}}
Web UI
  • Navigate to Admin > Policies > Add / update a policy.
  • Enter "ha-all" next to Name, "^ha\." next to Pattern, and "ha-mode" = "all" in the first line next to Policy.
  • Click Add policy.

Policy where queues whose names begin with "two." are mirrored to any two nodes in the cluster, with automatic synchronisation:

rabbitmqctl
rabbitmqctl set_policy ha-two "^two\." \
   '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
rabbitmqctl (Windows)
rabbitmqctl set_policy ha-two "^two\." ^
   "{""ha-mode"":""exactly"",""ha-params"":2,"ha-sync-mode":"automatic"}"
HTTP API
PUT /api/policies/%2f/ha-two
{"pattern":"^two\.", "definition":{"ha-mode":"exactly", "ha-params":2,"ha-sync-mode":"automatic"}}
Web UI
  • Navigate to Admin > Policies > Add / update a policy.
  • Enter "ha-two" next to Name and "^two\." next to Pattern.
  • Enter "ha-mode" = "exactly" in the first line next to Policy, then "ha-params" = 2 in the second line, then "ha-sync-mode" = "automatic" in the third, and set the type on the second line to "Number".
  • Click Add policy.

Policy where queues whose names begin with "nodes." are mirrored to specific nodes in the cluster:

rabbitmqctl
rabbitmqctl set_policy ha-nodes "^nodes\." \
   '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
rabbitmqctl (Windows)
rabbitmqctl set_policy ha-nodes "^nodes\." ^
   "{""ha-mode"":""nodes"",""ha-params"":[""rabbit@nodeA"", ""rabbit@nodeB""]}"
HTTP API
PUT /api/policies/%2f/ha-nodes
{"pattern":"^nodes\.", "definition":{"ha-mode":"nodes", "ha-params":["rabbit@nodeA", "rabbit@nodeB"]}
Web UI
  • Navigate to Admin > Policies > Add / update a policy.
  • Enter "ha-nodes" next to Name and "^nodes\." next to Pattern.
  • Enter "ha-mode" = "nodes" in the first line next to Policy, then "ha-params" in the second line, set the second line's type to "List", and then enter "rabbit@nodeA" and "rabbit@nodeB" in the sublist which appears.
  • Click Add policy.

Mirrored Queue Implementation and Semantics

As discussed, for each mirrored queue there is one master and several mirrors, each on a different node. The mirrors apply the operations that occur to the master in exactly the same order as the master and thus maintain the same state. All actions other than publishes go only to the master, and the master then broadcasts the effect of the actions to the mirrors. Thus clients consuming from a mirrored queue are in fact consuming from the master.

Should a mirror fail, there is little to be done other than some bookkeeping: the master remains the master and no client need take any action or be informed of the failure. Note that mirror failures may not be detected immediately and the interruption of the per-connection flow control mechanism can delay message publication. The details are described here.

If the master fails, then one of the mirrors will be promoted to master as follows:

  1. The longest running mirror is promoted to master, the assumption being that it is most likely to be fully synchronised with the master. If there is no mirror that is synchronised with the master, messages that only existed on master will be lost.
  2. The mirror considers all previous consumers to have been abruptly disconnected. It requeues all messages that have been delivered to clients but are pending acknowledgement. This can include messages for which a client has issued acknowledgements, say, if an acknowledgement was either lost on the wire before reaching the node hosting queue master, or it was lost when broadcast from the master to the mirrors. In either case, the new master has no choice but to requeue all messages that it has not seen acknowledgements for.
  3. Consumers that have requested to be notified when a queue fails over will be notified of cancellation.
  4. As a result of the requeuing, clients that re-consume from the queue must be aware that they are likely to subsequently receive messages that they have already received.
  5. As the chosen mirror becomes the master, no messages that are published to the mirrored queue during this time will be lost (barring subsequent failures on the promoted node). Messages published to a node that hosts queue mirror are routed to the queue master and then replicated to all mirrors. Should the master fail, the messages continue to be sent to the mirrors and will be added to the queue once the promotion of a mirror to the master completes.
  6. Messages published by clients using publisher confirms will still be confirmed even if the master (or any mirrors) fail between the message being published and a confirmation received by the publisher. From the point of view of the publisher, publishing to a mirrored queue is no different from publishing to a non-mirrored one.

If you are consuming from a mirrored queue with noAck=true (i.e. the client is not sending message acknowledgements), then messages can be lost. This is no different from the norm of course: the broker considers a message acknowledged as soon as it has been sent to a noAck=true consumer. Should the client disconnect abruptly, the message may never be received. In the case of a mirrored queue, should the master die, messages that are in-flight on their way to noAck=true consumers may never be received by those clients, and will not be requeued by the new master. Because of the possibility that the consuming client is connected to a node that survives, the consumer cancellation notification is useful to identify when such events may have occurred. Of course, in practise, if you care about not losing messages, then you are advised to consume with noAck=false.

Publisher Confirms and Transactions

Mirrored queues support both publisher confirms and transactions. The semantics chosen are that in the case of both confirms and transactions, the action spans all mirrors of the queue. So in the case of a transaction, a tx.commit-ok will only be returned to a client when the transaction has been applied across all mirrors of the queue. Equally, in the case of publisher confirms, a message will only be confirmed to the publisher when it has been accepted by all of the mirrors. It is correct to think of the semantics as being the same as a message being routed to multiple normal queues, and of a transaction with publications within that similarly are routed to multiple queues.

Flow Control

RabbitMQ uses a credit-based algorithm to limit the rate of message publication. Publishers are permitted to publish when they receive credit from all mirrors of a queue. Credit in this context means permission to publish. Mirrors that fail to issue credit can cause publishers to stall. Publishers will remain blocked until all mirrors issue credit or until the remaining nodes consider the mirror to be disconnected from the cluster. Erlang detects such disconnections by periodically sending a tick to all nodes. The tick interval can be controlled with the net_ticktime configuration setting.

Master Failures and Consumer Cancellation

Clients that are consuming from a mirrored queue may wish to know that the queue from which they have been consuming has failed over. When a mirrored queue fails over, knowledge of which messages have been sent to which consumer is lost, and therefore all unacknowledged messages are redelivered with the redelivered flag set. Consumers may wish to know this is going to happen.

If so, they can consume with the argument x-cancel-on-ha-failover set to true. Their consuming will then be cancelled on failover and a consumer cancellation notification sent. It is then the consumer's responsibility to reissue basic.consume to start consuming again.

For example (in Java):

Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-cancel-on-ha-failover", true);
channel.basicConsume("my-queue", false, args, consumer);

This creates a new consumer with the argument set.

Unsynchronised Mirrors

A node may join a cluster at any time. Depending on the configuration of a queue, when a node joins a cluster, queues may add a mirror on the new node. At this point, the new mirror will be empty: it will not contain any existing contents of the queue. Such a mirror will receive new messages published to the queue, and thus over time will accurately represent the tail of the mirrored queue. As messages are drained from the mirrored queue, the size of the head of the queue for which the new mirror is missing messages, will shrink until eventually the mirror's contents precisely match the master's contents. At this point, the mirror can be considered fully synchronised, but it is important to note that this has occurred because of actions of clients in terms of draining the pre-existing head of the queue.

A newly added mirror provides no additional form of redundancy or availability of the queue's contents that existed before the mirror was added, unless the queue has been explicitly synchronised. Since the queue becomes unresponsive while explicit synchronisation is occurring, it is preferable to allow active queues from which messages are being drained to synchronise naturally, and only explicitly synchronise inactive queues.

You can determine which mirrors are synchronised with the following rabbitmqctl invocation:

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

You can manually synchronise a queue with:

rabbitmqctl sync_queue name

And you can cancel synchronisation with:

rabbitmqctl cancel_sync_queue name

These features are also available through the management plugin.

Stopping nodes and synchronisation

If you stop a RabbitMQ node which contains the master of a mirrored queue, some mirror on some other node will be promoted to the master (assuming there is a synchronised mirror; see below). If you continue to stop nodes then you will reach a point where a mirrored queue has no more mirrors: it exists only on one node, which is now its master. If the mirrored queue was declared durable then, if its last remaining node is shutdown, durable messages in the queue will survive the restart of that node. In general, as you restart other nodes, if they were previously part of a mirrored queue then they will rejoin the mirrored queue.

However, there is currently no way for a mirror to know whether or not its queue contents have diverged from the master to which it is rejoining (this could happen during a network partition, for example). As such, when a mirror rejoins a mirrored queue, it throws away any durable local contents it already has and starts empty. Its behaviour is at this point the same as if it were a new node joining the cluster.

Stopping Master Nodes with Only Unsynchronised Mirrors

It's possible that when you shut down a master node that all available mirrors are unsynchronised. A common situation in which this can occur is rolling cluster upgrades. By default, RabbitMQ will refuse to fail over to an unsynchronised mirror on controlled master shutdown (i.e. explicit stop of the RabbitMQ service or shutdown of the OS) in order to avoid message loss; instead the entire queue will shut down as if the unsynchronised mirrors were not there. An uncontrolled master shutdown (i.e. server or node crash, or network outage) will still trigger a failover even to an unsynchronised mirror.

If you would prefer to have master nodes fail over to unsynchronised mirrors in all circumstances (i.e. you would choose availability of the queue over avoiding message loss) then you can set the ha-promote-on-shutdown policy key to always rather than its default value of when-synced.

Loss of a Master While All Mirrors are Stopped

It is possible to lose the master for a queue while all mirrors for the queue are shut down. In normal operation the last node for a queue to shut down will become the master, and we want that node to still be the master when it starts again (since it may have received messages that no other mirror saw).

However, when you invoke rabbitmqctl forget_cluster_node, RabbitMQ will attempt to find a currently stopped mirror for each queue which has its master on the node we are forgetting, and "promote" that mirror to be the new master when it starts up again. If there is more than one candidate, the most recently stopped mirror will be chosen.

It's important to understand that RabbitMQ can only promote stopped mirrors during forget_cluster_node, since any mirrors that are started again will clear out their contents as described at "stopping nodes and synchronisation" above. Therefore when removing a lost master in a stopped cluster, you must invoke rabbitmqctl forget_cluster_node before starting mirrors again.

Batch Synchronization

Since RabbitMQ 3.6.0, masters perform synchronisation in batches. Batch can be configured via the ha-sync-batch-size queue argument. Earlier versions will will synchronise 1 message at a time by default. By synchronising messages in batches, the synchronisation process can be sped up considerably.

To choose the right value for ha-sync-batch-size you need to consider:

  • average message size
  • network throughput between RabbitMQ nodes
  • net_ticktime value

For example, if you set ha-sync-batch-size to 50000 messages, and each message in the queue is 1KB, then each synchronisation message between nodes will be ~49MB. You need to make sure that your network between queue mirrors can accomodate this kind of traffic. If the network takes longer than net_ticktime to send one batch of messages, then nodes in the cluster could think they are in the presence of a network partition.

Configuring Synchronisation

Let's start with the most important aspect of queue synchronisation: while a queue is being synchronised, all other queue operations will be blocked. Depending on multiple factors, a queue might be blocked by synchronisation for many minutes or hours, and in extreme cases even days.

Queue synchronisation can be configured as follows:

  • ha-sync-mode: manual - this is the default mode. A new queue mirror will not receive existing messages, it will only receive new messages. The new queue mirror will become an exact replica of the master over time, once consumers have drained messages that only exist on the master. If the master queue fails before all unsychronised messages are drained, those messages will be lost. You can fully synchronise a queue manually, refer to unsynchronised mirrors section for details.
  • ha-sync-mode: automatic - a queue will automatically synchronise when a new mirror joins. It is worth reiterating that queue synchronisation is a blocking operation. If queues are small, or you have a fast network between RabbitMQ nodes and the ha-sync-batch-size was optimised, this is a good choice.