Skip to main content

Scheduling Messages with RabbitMQ

· 4 min read
Alvaro Videla

For a while people have looked for ways of implementing delayed messaging with RabbitMQ. So far the accepted solution was to use a mix of message TTL and Dead Letter Exchanges as proposed by James Carr here. Since a while we have thought to offer an out-of-the-box solution for this, and these past month we had the time to implement it as a plugin. Enter RabbitMQ Delayed Message Plugin.

The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the users choses to do so. Let's see how it works.

Installing the Plugin

To install the plugin go to our Community Plugins page and download the corresponding .ez files for your RabbitMQ installation. Copy the plugin into RabbitMQ's plugin folder and then enable it by running the following command:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Once the plugin has been enabled, we are ready to start using it.

Using the Exchange

To use the Delayed Message Exchange you just need to declare an exchange providing the "x-delayed-message" exchange type as follows:

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

Later on we will explain the meaning of the special argument "x-delayed-type" that we provided in our exchange declaration.

Delaying Messages

To delay a message a user must publish the message with the special header called x-delay which takes an integer representing the number of milliseconds the message should be delayed by RabbitMQ. It's worth noting that here delay means: delay message routing to queues or to other exchanges. The exchange has no concept of consumers. So once the delay expired, the plugin will attempt to route the message to the queues matching the routing rules of the exchange and the once assigned to the message. Be aware that if the message can't be routed to any queue, then it will be discarded, as is specified by AMQP with unroutable messages. Here's some sample code that adds the x-delay header to a message and publishes to our exchange.

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

In the previous example, the message will be delayed for five seconds before it gets routed by the plugin. That example assumes you have established a connection to RabbitMQ and obtained a channel.

Flexible Routing

When we declared the exchange above, we provided an x-delayed-type argument set to direct. What that does is to tell the exchange what kind of behaviour we want it to have when routing messages, creating bindings, and so on. In the example, our exchange will behave like the direct exchange, but we could pass there topic, fanout, or a custom exchange type provided by some other plugin. By doing this we don't limit the user on what kind of routing behaviour the delayed message plugin offers.

Checking if a Message was Delayed

Once we receive a message on the consumer side, how can we tell if the message was delayed or not? The plugin will keep the x-delay message header, but will negate the passed value. So if you published a message with a 5000 milliseconds delay, the consumer receiving said message will find the x-delay header set to -5000

We need feedback

We have released the plugin as experimental to gather feedback from the community. Please use it and report back to us on the plugin's issue page or on our official mailing list.

Learn More