RabbitMQ tutorial - Publish/Subscribe
Publish/Subscribe
(using the AMQP 1.0 Go client)
Prerequisites
This tutorial assumes RabbitMQ is installed and running on
localhost on the standard port (5672). In case you
use a different host, port or credentials, connections settings would require
adjusting.
Where to get help
If you're having trouble going through this tutorial you can contact us through GitHub Discussions or RabbitMQ community Discord.
In the previous tutorial we created a work queue. We assumed that the tasks were distributed among multiple workers. In this tutorial we'll do something completely different — we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".
To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs — the first will emit log messages, and the second will receive and print them.
In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to the disk, and at the same time run another receiver in the terminal and see the logs printed there.
Essentially, published log messages are going to be broadcast to all the receivers.
Exchanges
In the previous tutorials we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in RabbitMQ.
Let's quickly cover what we've learned:
- A producer is a user application that sends messages.
- A queue is a buffer that stores messages.
- A consumer is a user application that receives messages.
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to a queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded? The rules for that are defined by the exchange type.
There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the
last one — the fanout. Let's create an exchange of this type:
_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"})
if err != nil {
log.Panicf("Failed to declare an exchange: %v", err)
}
The fanout exchange broadcasts all the messages it receives to all the
queues it knows about. That's exactly what we need for our logger.
Bindings
We've already created a fanout exchange and a queue. Now we need to tell the
exchange to send messages to our queue. That relationship between exchange and a
queue is called a binding.
qInfo, err := conn.Management().DeclareQueue(ctx, &rmq.AutoGeneratedQueueSpecification{
IsExclusive: true,
IsAutoDelete: true,
})
if err != nil {
log.Panicf("Failed to declare a queue: %v", err)
}
_, err = conn.Management().Bind(ctx, &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: "logs",
DestinationQueue: qInfo.Name(),
BindingKey: "",
})
if err != nil {
log.Panicf("Failed to bind a queue: %v", err)
}
From now on the logs exchange will append messages to our queue. An exclusive queue is one which is only accessible by the declaring consumer and will be deleted when it disconnects.
Let's run this code. We'll use go run to create the logger consumer:
go run receive_logs.go
Now let's emit some logs and publish a message to the logs exchange:
go run emit_log.go "Here is the first log"
Putting it all together
The producer emit_log.go:
package main
import (
"context"
"log"
"os"
"strings"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
const brokerURI = "amqp://guest:guest@localhost:5672/"
func main() {
ctx := context.Background()
env := rmq.NewEnvironment(brokerURI, nil)
conn, err := env.NewConnection(ctx)
if err != nil {
log.Panicf("Failed to connect to RabbitMQ: %v", err)
}
defer func() {
_ = env.CloseConnections(context.Background())
}()
_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"})
if err != nil {
log.Panicf("Failed to declare an exchange: %v", err)
}
publisher, err := conn.NewPublisher(ctx, &rmq.ExchangeAddress{Exchange: "logs", Key: ""}, nil)
if err != nil {
log.Panicf("Failed to create publisher: %v", err)
}
defer func() { _ = publisher.Close(context.Background()) }()
body := bodyFrom(os.Args)
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
if err != nil {
log.Panicf("Failed to publish a message: %v", err)
}
switch res.Outcome.(type) {
case *rmq.StateAccepted:
default:
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
}
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
And the consumer receive_logs.go:
package main
import (
"context"
"errors"
"log"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
const brokerURI = "amqp://guest:guest@localhost:5672/"
func main() {
ctx := context.Background()
env := rmq.NewEnvironment(brokerURI, nil)
conn, err := env.NewConnection(ctx)
if err != nil {
log.Panicf("Failed to connect to RabbitMQ: %v", err)
}
defer func() {
_ = env.CloseConnections(context.Background())
}()
_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"})
if err != nil {
log.Panicf("Failed to declare an exchange: %v", err)
}
qInfo, err := conn.Management().DeclareQueue(ctx, &rmq.AutoGeneratedQueueSpecification{
IsExclusive: true,
IsAutoDelete: true,
})
if err != nil {
log.Panicf("Failed to declare a queue: %v", err)
}
_, err = conn.Management().Bind(ctx, &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: "logs",
DestinationQueue: qInfo.Name(),
BindingKey: "",
})
if err != nil {
log.Panicf("Failed to bind a queue: %v", err)
}
consumer, err := conn.NewConsumer(ctx, qInfo.Name(), nil)
if err != nil {
log.Panicf("Failed to create consumer: %v", err)
}
defer func() { _ = consumer.Close(context.Background()) }()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
for {
delivery, err := consumer.Receive(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Panicf("Failed to receive a message: %v", err)
}
msg := delivery.Message()
var body string
if len(msg.Data) > 0 {
body = string(msg.Data[0])
}
log.Printf(" [x] %s", body)
err = delivery.Accept(ctx)
if err != nil {
log.Panicf("Failed to accept message: %v", err)
}
}
}
To try this, open a terminal and run the receiver:
go run receive_logs.go
# => [*] Waiting for logs. To exit press CTRL+C
Then, in other terminal(s), run the emitter:
go run emit_log.go "Here is the first log"
# => [x] Sent Here is the first log
go run emit_log.go "Here is the second log"
# => [x] Sent Here is the second log
The consumer receives every message the publisher sends. Run as many receivers as you like — they'll each get their own copy of the messages.
Received logs will appear in the receiving terminals as:
go run receive_logs.go
# => [*] Waiting for logs. To exit press CTRL+C
# => [x] Here is the first log
# => [x] Here is the second log
Now we can move on to tutorial 4 and learn how to route messages based on routing keys.