Stream Client Connections
Overview
This companion guide to the main guide on RabbitMQ streams covers how RabbitMQ Stream Protocol clients can connect to a cluster to consume from and publish to streams.
The Stream Protocol has important differences from the other protocols supported by RabbitMQ, such as AMQP 1.0, AMQP 0-9-1, MQTT and STOMP.
With streams, understanding the basics of the protocol and what client libraries can do is essential when cluster deployments involve extra layers like containers and load balancers.
Streams are optimized for maximum throughput, so the topic of data locality and client connections becomes significantly more important to cover in details.
Stream Topology and What it Means for Publishers and Consumers
How messaging protocol clients connect to cluster nodes is covered in the Clustering guide.
A stream is replicated and persistent, composed of a leader (primary member/replica) and followers (or secondary members/replicas). These replicas are distributed across multiple nodes of a RabbitMQ cluster, as shown in the following diagram:
Only the leader handles write operations, such as adding inbound messages to the stream. Any member of the stream – both the leader and any follower — can be used for read operations, that is, delivering (dispatching) messages to client applications.
An application that publishes to a stream using the stream protocol can connect to any node in the cluster: messages will automatically be routed from the node handling the client connection to the node that hosts the leader process.
However, in this case traffic routing will not be optimal if the connection and the stream leader are not on the same node. For best data locality and efficiency, an application that publishes to a stream should connect to the node that hosts the leader of the stream, to avoid an extra network hop.
Consumers
The behavior differs for consuming applications. With the RabbitMQ Stream Protocol, messages are delivered (dispatched)
to applications using the sendfile
system call: file chunks that contain messages are sent directly
from the node file system to the network socket, without going through user space.
This optimization is crucial to stream efficiency. However, it also requires that the node the consuming application is connected to hosts a member of the stream.
Whether this member is the leader or a replica does not matter, as long as the data is on the file system, ready to be moved to the socket
by the kernel executing a sendfile
system call.
This constraint for consuming applications is manageable in most cases. On the diagram above, each node has a member of the stream, so an application can connect to any node to consume. However, consider a 5-node cluster with streams using a replication factor of 2: each stream will have members only on 3 nodes out of the 5 nodes.
In this case, consuming applications must select their connection node appropriately.
Best Practices for Publishers and Consumers
Publishing applications can connect to any node of a cluster and will always reach the leader process. Consuming applications must connect to a node that hosts a member of the target stream, where this member can be either the leader or a follower. The following best practices should be enforced whenever possible:
- Publishing applications should always connect to the node that hosts the leader process of the target stream
- Consuming applications should always connect to a node that hosts a replica of the target stream
The following diagram illustrates these best practices:
Connecting directly to the node of the stream leader avoids a network hop, as published messages ultimately must go to the leader. Using a replica for consuming relieves the leader from some load, allowing it to spend more resources handling all the write operations.
These best practices are integrated into the official RabbitMQ Stream Protocol client libraries, keeping these details from complicating application code.
- Publishing applications should always connect to the node that hosts the leader process of the target stream
- Consuming applications should always connect to a node that hosts a replica of the target stream
The stream protocol allows client libraries (and applications) to discover the topology of a given stream through the metadata command.
Stream Distribution Across Cluster Nodes
Before examining the metadata
command of the stream protocol, it is important to understand how streams distribute across the nodes of a RabbitMQ cluster.
A stream has a leader Erlang process located on one node and replica Erlang processes located on other nodes.
With multiple streams, the leader and follower processes are spread across the cluster nodes.
With the exception of single node clusters, no single RabbitMQ node should host all the stream leaders.
A set of stream members (replicas) can be thought of as a small cluster within the RabbitMQ cluster, as illustrated with several streams in the following diagram:
The distribution of leaders across the cluster depends on the leader locator strategy in effect at stream declaration time.
Stream Topology Discovery Using the metadata
Command
The stream protocol provides a metadata
command that
allows clients to query the topology of one or several streams. For each queried stream, the response contains
the hostname and port of the nodes that host the leader and replicas.
The following diagram illustrates how a client application already connected to one of the nodes can discover the topology of a given stream:
A common pattern is to provide one or several node endpoints to a client library, then using the metadata
command once connected
to discover the topology of the target stream, and then connecting to the appropriate nodes depending on the operations (publishing or consuming).
The metadata
command is essential for client libraries to enforce the best practices mentioned above.
Unfortunately, the metadata returned with all defaults will not always be accurate, or at least not accurate enough for the client application to connect successfully.
Limitations of the Metadata Command
RabbitMQ streams will return the hostname of each node for the host metadata (more specifically, the host part of the node name, the {hostname}
part in rabbit@{hostname}
).
This works as long as the client can resolve the hostname of the target node.
However, when RabbitMQ nodes are deployed in containerized environments, the hostname can be ambiguous and may not resolve on the hosts where applications are deployed.
The following diagram illustrates a 3-node RabbitMQ cluster where the nodes are containers running on different VMs. A client application can connect to the nodes if the ports are mapped correctly, but cannot do so using the hostname of the containers.
The RabbitMQ node with the stream plugin enabled does its best but it cannot know what hostnames clients can or cannot resolve, and why.
Fortunately, it is possible to configure what a node returns when asked for its "coordinates" for the metadata
command.
Tuning the Metadata Command: Advertised Host and Port
The advertised_host
and advertised_port
configuration entries of the stream plugin should be used to specify
what a node returns when asked how to be contacted. The plugin will return these values as given, without any validation.
The DNS setup must allow client applications to connect to the node using these configured values. In practice this means
that the overridden advertised hostnames must be stable and resolvable by application hosts.
The advertised_host
and advertised_port
settings should resolve connection issues where client applications cannot connect to nodes due to
using the hostnames advertised by default. These settings are important to consider when deploying a RabbitMQ cluster with containerized nodes and streams.
When RabbitMQ nodes use hostnames that applications cannot resolve, using the advertised_host
and advertised_port
settings
becomes essential.
There remains one common use case where this discovery mechanism can be problematic: when a load balancer sits between client applications and the cluster nodes.
Connecting to Nodes Behind a Load Balancer
Having a load balancer in front of a RabbitMQ cluster is a common scenario. A load balancer can make the data locality problem outlined above much worse. Fortunately, solutions exist.
When using the metadata command with a load balancer, issues arise: the client will receive the nodes information and use it to connect directly to the nodes, bypassing the load balancer. The following diagram illustrates this situation:
This behavior is usually undesirable.
Setting the advertised_host
and advertised_port
configuration entries to use the load balancer information so client applications always
connect to the load balancer is not recommended.
This approach prevents enforcing the best practices (publishing to the leader, consuming from replica) and in deployments where streams are not on all nodes, consuming will fail if the application connects to a node without a stream member.
Client libraries can implement a workaround to resolve this problem.
Client Workaround With a Load Balancer
A client application can always connect to the load balancer and end up connected to the appropriate node using the following approach:
- Use the
metadata
command but intentionally ignore the discovered result and always connect to the load balancer - Retry connecting until connected to an appropriate node
The "coordinates" of the node (hostname and port, or advertised_host
and advertised_port
if configured) are available in a stream protocol connection.
A client application can determine to which node it is connected.
This means that advertised_host
and advertised_port
should not be configured when a load balancer is in use.
The "coordinates" of a node that the metadata
command returns are not used to connect in this case, as the client always connects to the load balancer.
They are used to correlate the connection the load balancer provides with the node the client expects, and the hostname is sufficient for this purpose.
This means advertised_host
and advertised_port
should not be configured when a load balancer is in use.
Consider the following scenario:
- A publishing application knows the leader of its targeted stream is on
node-1
thanks to the response of ametadata
request - It creates a new connection using the load balancer address
- The load balancer chooses to connect to
node-3
- The connection is properly established but the client application discovers it is connected to
node-3
, it immediately closes the connection, and retries - The load balancer chooses
node-1
on the next attempt - The application is connected to the correct node and proceeds with publishing using this connection
The following diagram illustrates this process:
As stream connections are meant to be long-lived and stream applications do not typically have significant connection churn, retrying to connect will not lead to a high connection churn scenario and is not a concern.
This solution assumes that the load balancer will not always connect to the same backend server. Round robin is an appropriate balancing strategy for this case.
Setting advertised_host
and advertised_port
is not necessary when using this technique and setting them to the load balancer coordinates for all nodes can be
impossible or difficult to achieve. Allowing each node to return its hostname is appropriate here, as the hostname should be unique in a network.
This responsibility lies with the client library. The following section describes how this is implemented with the stream Java client.
Using the Stream Java Client With a Load Balancer
The stream Java client provides an AddressResolver
extension point. It is used whenever a new connection is created: from the passed-in Address
(the node to connect to based on the metadata
query), the address resolver can provide logic to compute the actual address to use. The default implementation returns the given address. To implement the workaround presented above when a load balancer is in use, always return the address of the load balancer, as shown in the following code snippet:
Address entryPoint = new Address("my-load-balancer", 5552);
Environment environment = Environment.builder()
.host(entryPoint.host())
.port(entryPoint.port())
.addressResolver(address -> entryPoint)
.build();
The stream PerfTest tool also supports this mode when the --load-balancer
option is enabled. The following commands configure the tool to always use the same entry point for publishers and consumers connections:
# with the Java binary
java -jar stream-perf-test.jar --uris rabbitmq-stream://my-load-balancer:5552 --load-balancer
# with Docker
docker run -it --rm pivotalrabbitmq/stream-perf-test --uris rabbitmq-stream://my-load-balancer:5552 --load-balancer
Best Practices, Summarized
Client applications connecting using the stream protocol should follow these guidelines:
- Publishing applications should connect to the node that hosts the leader of the target stream
- Consuming applications should connect to a node that hosts a replica of the target stream
- Client applications must use the
metadata
stream protocol command to learn about the topology of the streams they want to interact with - The stream Java and Go clients enforce these best practices
- The
metadata
command returns by default the node's hostname and listener port, which can be problematic in containerized environments - The
advertised_host
andadvertised_port
configuration entries allow specifying what values a node should return for themetadata
command - A load balancer can confuse a client library that will try to bypass it to connect directly to the nodes
- Client libraries can provide a workaround to work properly with a load balancer
- The stream Java and Go clients implement such a workaround