Menu

Configuring Static Shovels

Overview

This guide focuses on statically configured shovels. It assumes familiarity with the key concepts behind the Shovel plugin.

Unlike with dynamic shovels, static shovels are configured using the advanced configuration file. They are started on node boot and are primarily useful for permanently running workloads. Any changes to static shovel configuration would require a node restart, which makes them highly inflexible.

Most users should prefer dynamic shovels to static ones for their flexibility and ease of automation. Generating a dynamic shovel definition (a JSON document) is usually easier compared to a static shovel definition (which uses Erlang terms).

Configuration

The configuration for the Shovel plugin must be defined in the in the advanced configuration file.

It consists of a single shovels clause that lists the shovels that should be started on node boot:

{rabbit, [
  %% ...
]},

{rabbitmq_shovel, [
  {shovels, [
    {shovel_one, [
      %% shovel_one properties ...
    ]},
    %% ...
  ]}
]}

A (deliberately verbose) example configuration can be found below.

Each element of the shovels clause is a named static shovel. The names in the list must be distinct.

A shovel definition looks like this at the top level:

{shovel_name, [
  {source, [ ...protocol specific config... ]},
  {destination, [ ...protocol specific config... ]},
  {ack_mode, a_mode},
  {reconnect_delay, reconn_delay}
]}

where shovel_name is the name of the shovel (an Erlang atom). The name should be enclosed in single quotes (') if they do not begin with a lower-case letter or if they contain other characters than alphanumeric characters, underscore (_), or @.

The Source and The Destination

A shovel transfers messages from a source to a destination.

The source and destination keys are mandatory and contain nested protocol-specific keys. Currently AMQP 0.9.1 and AMQP 1.0 are two supported protocols. Source and destination do not have to use the same protocol. All the other properties are optional.

source is a mandatory key and has different keys properties for different protocols. Two properties are common across all protocols: protocol and uris. protocol supports two values: amqp091 and amqp10, for AMQP 0-9-1 and AMQP 1.0, respectively:

%% for AMQP 0-9-1
{protocol, amqp091}

uris is a list of AMQP connection URIs:

{uris, [
        "amqp://fred:secret@host1.domain/my_vhost",
        "amqp://john:secret@host2.domain/my_vhost"
       ]}

The URI syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference which are available to static shovels, such as TLS certificate and private key.

General Source Keys

Some keys are supported by both AMQP 0-9-1 and AMQP 1.0 sources. They are described in the table below.

General Static Shovel Keys (Properties)
Key Description
reconnect-delay The duration (in seconds) to wait before reconnecting to the brokers after being disconnected at either end. Default is 1.
{reconnect_delay, 5}
would delay for five seconds before reconnecting after failure. Value of `0` means no reconnection: the shovel will stop after first failure or unsuccessful connection attempt.
ack-mode

Determines how the shovel should acknowledge consumed messages. If set to on-confirm (the default), messages are acknowledged to the source broker after they have been confirmed by the destination. This handles network errors and broker failures without losing messages, and is the slowest option.

If set to on-publish, messages are acknowledged to the source broker after they have been published at the destination. This handles network errors without losing messages, but may lose messages in the event of broker failures.

If set to no-ack, message acknowledgements are not used. This is the fastest option, but may lose messages in the event of network or broker failures.

AMQP 0-9-1 Source Keys

AMQP 0-9-1-specific source keys are covered in a separate table:

AMQP 0-9-1 Source Keys (Properties)
Key Description
declarations

An optional list of AMQP 0-9-1 operations to be executed by the Shovel before it starts transferring messages. They are typically used to set up the topology.

  {declarations, declaration_list}

The declarations follow method and property names used by the RabbitMQ Erlang Client.

A minimalistic declaration example:

  {declarations, [
                   'queue.declare',
                   {'queue.bind', [
                                    {exchange, <<"my_exchange">>},
                                    {queue,    <<>>}
                                  ]}
                 ]}

will first declare an anonymous queue, and then bind it to the exchange called "my_exchange". The queue name of <<>> on method queue.bind means "use the queue last declared on this channel".

Each element of the declaration list is either an AMQP 0-9-1 method given as single quoted atom such as 'queue.declare', or a tuple with first element the method atom, and second element a property list of parameters.

If just the method name is used all the parameters take their defaults (as illustrated with 'queue.declare' above).

If a tuple and property-list is supplied, then the properties in the list specify some or all of the parameters explicitly.

Here is another example:

{'exchange.declare', [
                      {exchange, <<"my_exchange">>},
                      {type, <<"direct">>},
                      durable
                     ]}

will declare a durable, direct exchange called "my_exchange".

queue

The name of the source queue as an Erlang binary value. This property is mandatory:

{queue, <<"queue.1">>}

queue.1 is the name of the queue to shovel messages from, as a binary string.

This queue must exist. Use the resource declarations covered above to declare the queue or ensure it exists. If the value is <<>> (the empty binary string) then the most recently declared queue in declarations is used. This allows anonymous queues to be declared and used.

prefetch-count The maximum number of unacknowledged messages copied over a shovel at any one time. Default is 1000:
{prefetch_count, 1000}

AMQP 1.0 Source Keys

AMQP 1.0 source settings are different from those of AMQP 0-9-1 sources.

AMQP 1.0 Source Keys (Properties)
Key Description
source_address This represents the source address of the AMQP 1.0 link. This key is mandatory:
{source_address, <<"my-address">>}
prefetch-count This optional key sets the link credit amount that will be granted to the receiving link. The credit will be automatically renewed when it falls below a 10th of this value. The default is 1000. It takes the form
  {prefetch_count, 10}

Destination

destination is a mandatory key and has different keys properties for different protocols. Two properties are common across all protocols: protocol and uris. protocol supports two values: amqp091 and amqp10, for AMQP 0-9-1 and AMQP 1.0, respectively:

%% for AMQP 0-9-1
{protocol, amqp091}

uris is a list of AMQP connection URIs:

{uris, [
        "amqp://fred:secret@host1.domain/my_vhost",
        "amqp://john:secret@host2.domain/my_vhost"
       ]}

The URI syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference which are available to static shovels, such as TLS certificate and private key.

General Destination Keys

General Destination Keys (Properties)
Key Description
reconnect-delay The duration (in seconds) to wait before reconnecting to the brokers after being disconnected at either end. Default is 1.
{reconnect_delay, 5}
would delay for five seconds before reconnecting after failure. Value of `0` means no reconnection: the shovel will stop after first failure or unsuccessful connection attempt.

AMQP 0-9-1 Destination Keys

AMQP 0-9-1 Destination Keys (Properties)
Key Description
publish_properties

This optionak key controls message properties set or overridden by the shovel. It takes the following form

{publish_properties, [
  {delivery_mode, 2}
]}

where the properties in the list are set on the basic properties of each message before it is re-published.

This specific example would mark all re-published messages as persistent:

{publish_properties, [
  {delivery_mode, 2}
]}

By default the original properties of the message are preserved, but this clause can be used to change or set any known property:

  • content_type
  • content_encoding
  • headers
  • delivery_mode
  • priority
  • correlation_id
  • reply_to
  • expiration
  • message_id
  • timestamp
  • type
  • user_id
  • app_id
  • cluster_id

publish_fields

This optional key is similar to but controls the publishing settings instead of message properties that are accessible to consumers. It takes the form of

{publish_fields, [ 
                    {exchange, <<"my_exchange">>},
                    {routing_key, <<"from_shovel">>}
                  ]}

where the properties in the list are used to set the fields on the basic.publish method used to re-publish messages.

By default the messages are re-published using the original exchange name and routing key. By specifying

{publish_fields, [ 
                    {exchange, <<"my_exchange">>},
                    {routing_key, <<"from_shovel">>}
                  ]}

messages would be re-published to an explicit exchange name with an explicit, fixed routing key.

add_timestamp_header This boolean key controls whether a custom header, x-shovelled-timestamp, will be added to the message before it is re-published:
{add_timestamp_header, true}
This header value is timestamp (in seconds since epoch) when message had been shovelled. By default the header is not added.
add_forward_headers When set to true the shovel will add a number of custom message headers: shovelled-by, shovel-type, shovel-name, to provide some additional metadata about the transfer.
{add_forward_headers, true}

AMQP 1.0 Destination Keys

AMQP 1.0 Destination Keys (Properties)
Key Description
target_address

This represents the target address of the sending AMQP 1.0 link:

{target_address, <<"some-address">>}

properties This optional key controls what additional properties will be added when re-publishing messages. It takes the form of
{properties, [
  {content_typle, <<"application/json">>}
]}

The available keys include message_id, user_id, to, subject, reply_to, correlation_id, content_type, content_encoding, absolute_expiry_time, creation_time. See the AMQP 1.0 spec (§3.2.4) for the all the available keys and values.

application_properties

This optional key declares any additional application properties to be added when re-publishing a message. It takes the form of

{application_properties, [
  {<<"application-key-1">>, <<"value-1">>},
  {<<"application-key-2">>, <<"value-2">>}
]}

Keys and values should be binary strings as in the example below.

add_timestamp_header This boolean key controls whether a creation_time property will be set on the message before it is re-published:
{add_timestamp_header, true}
This value is timestamp (in seconds since epoch) when message had been shovelled. By default the property is not set.
add_forward_headers When set to true the shovel will add application properties for the following keys: shovelled-by, shovel-type, shovel-name to provide some additional metadata about the transfer.
{add_forward_headers, true}

Example Configuration

A reasonably complete static shovel configuration between AMQP 0.9.1 endpoints might look like this:

{rabbitmq_shovel,
  [ {shovels, [ {my_first_shovel,
                  [ {source,
                      [ {protocol, amqp091},
                        {uris, [ "amqp://fred:secret@host1.domain/my_vhost",
                                  "amqp://john:secret@host2.domain/my_vhost" ]},
                        {declarations, [ {'exchange.declare',
                                            [ {exchange, <<"my_fanout">>},
                                              {type, <<"fanout">>},
                                              durable
                                            ]},
                                          {'queue.declare',
                                            [{arguments,
                                                [{<<"x-message-ttl">>, long, 60000}]}]},
                                          {'queue.bind',
                                            [ {exchange, <<"my_fanout">>},
                                              {queue,    <<>>}
                                            ]}
                                          ]},
                        {queue, <<>>},
                        {prefetch_count, 10}
                      ]},
                    {destination,
                      [ {protocol, amqp091},
                        {uris, ["amqp://"]},
                        {declarations, [ {'exchange.declare',
                                            [ {exchange, <<"my_direct">>},
                                              {type, <<"direct">>},
                                              durable
                                            ]}
                                        ]},
                        {publish_properties, [ {delivery_mode, 2} ]},
                        {add_forward_headers, true},
                        {publish_fields, [ {exchange, <<"my_direct">>},
                                          {routing_key, <<"from_shovel">>}
                                          ]}
                          ]},
                    {ack_mode, on_confirm},
                    {reconnect_delay, 5}
                  ]}
              ]}
  ]}

The configuration above defines a single shovel called 'my_first_shovel'.

'my_first_shovel' will connect to a broker on either host1 or host2 (as source), and directly to the local broker as destination. It will reconnect to the other source broker on failure, after a delay of 5 seconds.

When connected to the source it will declare a a direct, fanout exchange called "my_fanout", an anonymous queue with a per-queue message ttl, and bind the queue to the exchange.

When connected to the destination (the local broker) it will declare a durable, direct exchange called "my_direct".

This shovel will re-publish messages sent to the anonymous queue on the source to the local exchange with the fixed routing key "from_shovel". The messages will be persistent and only acknowledged after receiving a publish confirm from the local broker.

The shovel consumer will not get more deliveries if there are at least ten unacknowledged messages at any moment in time.

Example Configuration (1.0 Source - 0.9.1 Destination)

A reasonably complete shovel configuration between an AMQP 1.0 source and an AMQP 0.9.1 destination might look like this:

{rabbitmq_shovel,
 [ {shovels, [ {my_first_shovel,
                [ {source,
                   [ {protocol, amqp10,
                      uris, [ "amqp://fred:secret@host1.domain/my_vhost",
                            ]},
                     {source_address, <<"my-source">>},
                     {prefetch_count, 10}
                   ]},
                  {destination,
                     [ {protocol, amqp091},
                       {uris, ["amqp://"]},
                       {declarations, [ {'exchange.declare',
                                         [ {exchange, <<"my_direct">>},
                                           {type, <<"direct">>},
                                           durable
                                         ]}
                                      ]},
                       {publish_properties, [ {delivery_mode, 2} ]},
                       {add_forward_headers, true},
                       {publish_fields, [ {exchange, <<"my_direct">>},
                                          {routing_key, <<"from_shovel">>}
                                        ]}
                     ]},
                  {ack_mode, on_confirm},
                  {reconnect_delay, 5}
                ]}
             ]}
 ]}

Example Configuration (0.9.1 Source — 1.0 Destination)

A more extensive shovel configuration between an AMQP 0.9.1 Source and an AMQP 1.0 destination might look like this:

{rabbitmq_shovel,
 [{shovels, [{my_first_shovel,
              {source,
               [{protocol, amqp091},
                {uris, ["amqp://fred:secret@host1.domain/my_vhost",
                        "amqp://john:secret@host2.domain/my_vhost"]},
                {declarations, [{'exchange.declare',
                                   [{exchange, <<"my_fanout">>},
                                    {type, <<"fanout">>},
                                    durable]},
                                {'queue.declare',
                                   [{arguments,
                                      [{<<"x-message-ttl">>, long, 60000}]}]},
                                {'queue.bind',
                                   [{exchange, <<"my_fanout">>},
                                    {queue,    <<>>}
                                    ]}
                               ]},
                {queue, <<>>},
                {prefetch_count, 10}
               ]},
              {destination,
               [{protocol, amqp10},
                %% Note: for plain text SASL authentication, use
                % {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
                %% Note: this relies on default user credentials
                %%       which has remote access restrictions, see
                %%       https://www.rabbitmq.com/access-control.html to learn more
                {uris, ["amqp://host:5672"]},
                {properties, [{user_id, <<"my-user">>}]},
                {application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
                {add_forward_headers, true},
                {target_address, <<"destination-queue">>}
               ]},
              {ack_mode, on_confirm},
              {reconnect_delay, 5}
             }]}
 ]}
}

Getting Help and Providing Feedback

If you have questions about the contents of this guide or any other topic related to RabbitMQ, don't hesitate to ask them on the RabbitMQ mailing list.

Help Us Improve the Docs <3

If you'd like to contribute an improvement to the site, its source is available on GitHub. Simply fork the repository and submit a pull request. Thank you!