Menu

Publish/Subscribe

(using the spring-amqp client)

Prerequisites

This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.

Where to get help

If you're having trouble going through this tutorial you can contact us through the mailing list.

In the first tutorial we showed how to use start.spring.io to leverage Spring Initializr to create a project with the RabbitMQ starter dependency for create spring-amqp applications.

In the previous tutorial we created a new package (tut2) to place our config, sender and receiver and created a work queue with two consumers. The assumption behind a work queue is that each task is delivered to exactly one worker.

In this part we'll implement the fanout pattern to deliver a message to multiple consumers. This pattern is known as "publish/subscribe" and is implementing by configuring a number of beans in our Tut3Config file.

Essentially, published messages are going to be broadcast to all the receivers.

Exchanges

In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.

Let's quickly go over what we covered in the previous tutorials:

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

digraph { bgcolor=transparent; truecolor=true; rankdir=LR; node [style="filled"]; // P1 [label="P", fillcolor="#00ffff"]; X [label="X", fillcolor="#3333CC"]; Q1 [label="{||||}", fillcolor="red", shape="record"]; Q2 [label="{||||}", fillcolor="red", shape="record"]; // P1 -> X; X -> Q1; X -> Q2; }

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's configure a bean to describe an exchange of this type, and call it tut.fanout:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({"tut3", "pub-sub", "publish-subscribe"})
@Configuration
public class Tut3Config {

    @Bean
    public FanoutExchange fanout() {
        return new FanoutExchange("tut.fanout");
    }

    @Profile("receiver")
    private static class ReceiverConfig {

        @Bean
        public Queue autoDeleteQueue1() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue autoDeleteQueue2() {
            return new AnonymousQueue();
        }

        @Bean
        public Binding binding1(FanoutExchange fanout,
            Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
        }

        @Bean
        public Binding binding2(FanoutExchange fanout,
            Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
        }

        @Bean
        public Tut3Receiver receiver() {
            return new Tut3Receiver();
        }
    }

    @Profile("sender")
    @Bean
    public Tut3Sender sender() {
        return new Tut3Sender();
    }
}

We ollow the same approach as in the previous two tutorials. We create three profiles, the tutorial ("tut3", "pub-sub", or "publish-subscribe"). They are all synonyms for running the fanout profile tutorial. Next we configure the FanoutExchange as a bean. Within the "receiver" (Tut3Receiver) file we define" four beans; 1) two autoDeleteQueues or AnonymousQueues and two bindings to bind those queues to the exchange.

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for fanning out our messages.

Listing exchanges

To list the exchanges on the server you can run the ever useful rabbitmqctl:

sudo rabbitmqctl list_exchanges

In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.

Nameless exchange

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").

Recall how we published a message before:

   template.convertAndSend(fanout.getName(), "", message);

The first parameter is the the name of the exchange that was autowired into the sender. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.

Now, we can publish to our named exchange instead:

@Autowired
private RabbitTemplate template;

@Autowired
private FanoutExchange fanout;   // configured in Tut3Config above

template.convertAndSend(fanout.getName(), "", message);

From now on the fanout exchange will append messages to our queue.

Temporary queues

As you may remember previously we were using queues which had a specified name (remember hello). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

But that's not the case for our fanout example. We want to hear about all messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.

Secondly, once we disconnect the consumer the queue should be automatically deleted. To do this with the spring-amqp client, we defined and AnonymousQueue, which creates a non-durable, exclusive, autodelete queue with a generated name:

@Bean
public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
    return new AnonymousQueue();
}

At this point our queue names contain a random queue names. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings

digraph { bgcolor=transparent; truecolor=true; rankdir=LR; node [style="filled"]; // P1 [label="P", fillcolor="#00ffff"]; X [label="X", fillcolor="#3333CC"]; Q1 [label="{||||}", fillcolor="red", shape="record"]; Q2 [label="{||||}", fillcolor="red", shape="record"]; // P1 -> X; X -> Q1 [label="binding"]; X -> Q2 [label="binding"]; }

We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding. In the above Tut3Config you can see that we have two bindings, one for each AnonymousQueue.

@Bean
public Binding binding1(FanoutExchange fanout, 
        Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

Listing bindings

You can list existing bindings using, you guessed it,

rabbitmqctl list_bindings

Putting it all together

digraph { bgcolor=transparent; truecolor=true; rankdir=LR; node [style="filled"]; // P [label="P", fillcolor="#00ffff"]; X [label="X", fillcolor="#3333CC"]; subgraph cluster_Q1 { label="amq.gen-RQ6..."; color=transparent; Q1 [label="{||||}", fillcolor="red", shape="record"]; }; subgraph cluster_Q2 { label="amq.gen-As8..."; color=transparent; Q2 [label="{||||}", fillcolor="red", shape="record"]; }; C1 [label=<C<font point-size="7">1</font>>, fillcolor="#33ccff"]; C2 [label=<C<font point-size="7">2</font>>, fillcolor="#33ccff"]; // P -> X; X -> Q1; X -> Q2; Q1 -> C1; Q2 -> C2; }

The producer program, which emits messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our fanout exchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for tut3.Sender.java program:

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Tut3Sender {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private FanoutExchange fanout;

    int dots = 0;

    int count = 0;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello");
        if (dots++ == 3) {
            dots = 1;
        }
        for (int i = 0; i < dots; i++) {
            builder.append('.');
        }
        builder.append(Integer.toString(++count));
        String message = builder.toString();
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

Tut3Sender.java source

As you see, we leverage the beans from the Tut3Config file and autowire in the RabbitTemplate along with our configured FanoutExchange This step is necessary as publishing to a non-existing exchange is forbidden.

The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.

The code for Tut3Receiver.java:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Tut3Receiver {

    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receive1(String in) throws InterruptedException {
        receive(in, 1);
    }

    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receive2(String in) throws InterruptedException {
        receive(in, 2);
    }

    public void receive(String in, int receiver) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + receiver + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + receiver + " [x] Done in " 
            + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }

}

Tut3Receiver.java source

Compile as before and we're ready to execute the fanout sender and receiver.

mvn clean package

And of course, to execute the tutorial do the following:

java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar --spring.profiles.active=pub-sub,receiver 
    --tutorial.client.duration=60000
java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar --spring.profiles.active=pub-sub,sender 
    --tutorial.client.duration=60000

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.java programs running you should see something like:

sudo rabbitmqctl list_bindings
tut.fanout  exchange    8b289c9c-a1eb-4a3a-b6a9-163c4fdcb6c2    queue       []
tut.fanout  exchange    d7e7d193-65b1-4128-a532-466a5256fd31    queue       []

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.

To find out how to listen for a subset of messages, let's move on to tutorial 4