Stream Plugin


Streams are a new persistent and replicated data structure in RabbitMQ 3.9 which models an append-only log with non-destructive consumer semantics. They can be used as a regular AMQP 0.9.1 queue or through a dedicated binary protocol plugin and associated client(s).

This page covers the Stream plugin, which allows to interact with streams using this new binary protocol. For an overview of the concepts and the ways to operate streams, please see the guide on RabbitMQ streams.

Client libraries for the stream protocol are available on several platforms: Java, Go, .NET, Python (rbly, rstream), Erlang, Elixir, Rust.

Enabling the Plugin

The Stream plugin is included in the RabbitMQ distribution. Before clients can successfully connect, it must be enabled using rabbitmq-plugins:

rabbitmq-plugins enable rabbitmq_stream

Plugin Configuration

TCP Listeners

When no configuration is specified the Stream Adapter will listen on all interfaces on port 5552 and have a default user login/passcode of guest/guest.

The port stream listener will listen on can be changed via rabbitmq.conf.

Below is a minimalistic configuration file which changes the listener port to 12345:

stream.listeners.tcp.1 = 12345

while one which changes the listener to listen only on localhost (for both IPv4 and IPv6) would look like:

stream.listeners.tcp.1 =
stream.listeners.tcp.2 = ::1:5552

TCP Listener Options

The plugin supports TCP listener option configuration.

The settings use a common prefix, stream.tcp_listen_options, and control things such as TCP buffer sizes, inbound TCP connection queue length, whether TCP keepalives are enabled and so on. See the Networking guide for details.

stream.listeners.tcp.1 =
stream.listeners.tcp.2 = ::1:5552

stream.tcp_listen_options.backlog = 4096
stream.tcp_listen_options.recbuf  = 131072
stream.tcp_listen_options.sndbuf  = 131072

stream.tcp_listen_options.keepalive = true
stream.tcp_listen_options.nodelay   = true

stream.tcp_listen_options.exit_on_close = true
stream.tcp_listen_options.send_timeout  = 120


It is possible to set the maximum size of frames (default is 1 MiB) and the heartbeat (default is 60 seconds), if needed:

stream.frame_max = 2097152 # in bytes
stream.heartbeat = 120 # in seconds

Flow Control

Fast publishers can overwhelm the broker if it cannot keep up writing and replicating inbound messages. So each connection has a maximum number of outstanding unconfirmed messages allowed before being blocked (initial_credits, defaults to 50,000). The connection is unblocked when a given number of messages is confirmed (credits_required_for_unblocking, defaults to 12,500). You can change those values according to your workload:

stream.initial_credits = 100000
stream.credits_required_for_unblocking = 25000

High values for these settings can improve publishing throughput at the cost of higher memory consumption (which can finally make the broker crash). Low values can help to cope with a lot of moderately fast-publishing connections.

Advertised Host and Port

The stream protocol allows to discover the topology of streams, that is where the leader and replicas for a given set of streams are located in the cluster. This way the client can choose to connect to the appropriate node to interact with the streams: the leader node to publish, a replica to consume. By default, nodes return their hostname and listener port, which may be fine for most situations, but not always (proxy sitting between the cluster nodes and the clients, cluster nodes and/or clients running in containers, etc).

The advertised_host and advertised_port keys allow to specify which information a broker node returns when asked the topology of streams. One can set those settings according to their infrastructure, so that clients can connect to cluster nodes:

stream.advertised_host = rabbitmq-1
stream.advertised_port = 12345

The Connecting to Streams blog post covers why the advertised_host and advertised_port settings are necessary in some deployments.

TLS Support

To use TLS for stream connections, TLS must be configured in the broker. To enable TLS-enabled stream connections, add a TLS listener for streams using the stream.listeners.ssl.* configuration keys.

The plugin will use core RabbitMQ server certificates and key (just like AMQP 0-9-1 and AMQP 1.0 listeners do):

ssl_options.cacertfile = /path/to/tls/ca_certificate.pem
ssl_options.certfile   = /path/to/tls/server_certificate.pem
ssl_options.keyfile    = /path/to/tls/server_key.pem
ssl_options.verify     =  verify_peer
ssl_options.fail_if_no_peer_cert = true

stream.listeners.tcp.1 = 5552
# default TLS-enabled port for stream connections
stream.listeners.ssl.1 = 5551

This configuration creates a standard TCP listener on port 5552 and a TLS listener on port 5551.

When a TLS listener is set up it may be desired to disable all non-TLS ones. This can be configured like so:

stream.listeners.tcp   = none
stream.listeners.ssl.1 = 5551

Just like for plain connections, it is possible to configure advertised TLS host and port. When TLS is used, the plugin returns the following metadata:

  • hostname: if set, the advertised_host, or the hostname if advertised_host is not set
  • port: the current TLS port

It is possible to override this behavior by setting together or individually the advertised_tls_host and advertised_tls_port configuration entries:

stream.advertised_host = private-rabbitmq-1
stream.advertised_port = 12345
stream.advertised_tls_host = public-rabbitmq-1
stream.advertised_tls_port = 12344

Getting Help and Providing Feedback

If you have questions about the contents of this guide or any other topic related to RabbitMQ, don't hesitate to ask them on the RabbitMQ mailing list.

Help Us Improve the Docs <3

If you'd like to contribute an improvement to the site, its source is available on GitHub. Simply fork the repository and submit a pull request. Thank you!