Scheduling Messages with RabbitMQ
For a while people have looked for ways of implementing delayed messaging with RabbitMQ. So far the accepted solution was to use a mix of message TTL and Dead Letter Exchanges as proposed by James Carr here. Since a while we have thought to offer an out-of-the-box solution for this, and these past month we had the time to implement it as a plugin. Enter RabbitMQ Delayed Message Plugin.
The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the users choses to do so. Let's see how it works.
Installing the Plugin
To install the plugin go to our Community Plugins page and download the corresponding .ez files for your RabbitMQ installation. Copy the plugin into RabbitMQ's plugin folder and then enable it by running the following command:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Once the plugin has been enabled, we are ready to start using it.
Using the Exchange
To use the Delayed Message Exchange you just need to declare an
exchange providing the
"x-delayed-message" exchange type as follows:
// ... elided code ... Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... more code ...
Later on we will explain the meaning of the special argument
"x-delayed-type" that we provided in our exchange declaration.
To delay a message a user must publish the message with the special
x-delay which takes an integer representing the number
of milliseconds the message should be delayed by RabbitMQ. It's worth
noting that here delay means: delay message routing to queues or to
The exchange has no concept of consumers. So once the delay expired, the plugin will attempt to route the message to the queues matching the routing rules of the exchange and the once assigned to the message. Be aware that if the message can't be routed to any queue, then it will be discarded, as is specified by AMQP with unroutable messages.
Here's some sample code that adds the
x-delay header to a message
and publishes to our exchange.
// ... elided code ... byte messageBodyBytes = "delayed payload".getBytes(); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder(); headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); props.headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
In the previous example, the message will be delayed for five seconds before it gets routed by the plugin. That example assumes you have established a connection to RabbitMQ and obtained a channel.
When we declared the exchange above, we provided an
argument set to
direct. What that does is to tell the exchange what
kind of behaviour we want it to have when routing messages, creating
bindings, and so on. In the example, our exchange will behave like the
direct exchange, but we could pass there topic, fanout, or a custom
exchange type provided by some other plugin. By doing this we don't
limit the user on what kind of routing behaviour the delayed message
Checking if a Message was Delayed
Once we receive a message on the consumer side, how can we tell if the
message was delayed or not? The plugin will keep the
header, but will negate the passed value. So if you published a
message with a
5000 milliseconds delay, the consumer receiving said
message will find the
x-delay header set to