Menu

Configuring Static Shovels (v.3.7+)

The configuration for the Shovel plugin must be defined in the in the advanced configuration file or using the classic config format. It is an Erlang term and consists of a single shovels clause:

  {rabbitmq_shovel, [ {shovels, [ {shovel_name, [ ... ]}, ... ]} ]}

A (deliberately verbose) example configuration is given below.

Each element of the list in the shovels clause is a named static shovel. The shovel_names in the list must be distinct.

Each shovel definition looks like this at the top level:

          {shovel_name,
              [ {source, [ ...protocol specific config... ]},
                {destination, [ ...protocol specific config... ]},
                {ack_mode, a_mode},
                {reconnect_delay, reconn_delay}
              ]}
      

where shovel_name is the name of the shovel (an Erlang atom). The clauses for source and destination are mandatory and protocol specific. Currently AMQP 0.9.1 and AMQP 1.0 are supported. Source and destination do not have to use the same protocol. All the other clauses are optional.

Each clause is fully described below.

shovel_name

The name of the shovel (an Erlang atom). Note that Erlang atoms should be enclosed in single quotes (') if they do not begin with a lower-case letter or if they contain other characters than alphanumeric characters, underscore (_), or @.

When using the shovel with clustering, if two or more nodes both define a shovel with the same name, the shovel worker will be started on just one of them. However if that node fails then the shovel will be recreated on a surviving node. It is important that if two or more nodes do define a shovel with the same name, then the configuration given for that shovel should be identical for all of them.

ack_mode

This clause is optional. In

  {ack_mode, a_mode}

a_mode is one of 'no_ack', 'on_publish' or 'on_confirm'.

'no_ack'

indicates that no message acknowledgements are to be generated by the shovel (the broker automatically acknowledges all delivered messages);

'on_publish'

indicates that a message acknowledgement is to be sent (to the source broker) after each message is re-published to the destination;

'on_confirm'

indicates that publish confirmations are sought and that a message acknowledgement is to be sent (to the source broker) after each message publication is confirmed by the destination broker.

The default is 'on_confirm', which is highly recommended. If other options are chosen performance may improve slightly, but messages are more likely to be lost in the event of failures.

reconnect_delay

This clause is optional. In

  {reconnect_delay, reconn_delay}

reconn_delay is the number of seconds to wait before reconnecting in the event of connection failure (a non-negative number). For example:

  {reconnect_delay, 1.5}

would delay for one and a half seconds before reconnecting after failure.

If reconn_delay is 0, then no reconnections occur: the shovel will stop after the first failure.

The default reconn_delay is 5 (seconds).

source

Source is a mandatory clause and have different properties for different protocols. Two properties are common across all protocols:

  {source,
      [
       {protocol, amqp091 | amqp10},
       {uris, uri_list}
      ]}
protocol

This clause is mandatory and can currently take one of two values:

amqp091 or amqp10

uris

This clause is mandatory. In

  {uris, uri_list}

uri_list is a list of URI broker connections (for the basic syntax of amqp091, see AMQP URI), for example:

    [ "amqp://fred:[email protected]/my_vhost"
    , "amqp://john:[email protected]/my_vhost"
    ]
If the host is omitted (not valid in a general AMQP URI), the shovel uses a direct connection to the broker in which it is running when the protocol also is amqp091. This avoids using the network stack.

The syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference for the Erlang client's extensions (including those for SSL and SASL) which are available to the shovel.

AMQP 0.9.1 source

declarations

This clause is optional. In

  {declarations, declaration_list}

the declaration_list is a list of AMQP methods (in the style of the Erlang client) which can be sent to the broker after connection and before shovelling.

This allows any resources that may need to be set up to be configured, including the source queue and the destination exchanges. For example:

  {declarations, [ 'queue.declare',
                   {'queue.bind', [ {exchange, <<"my_exchange">>},
                                    {queue,    <<>>}
                                  ]}
                 ]}

will first declare an anonymous queue, and then bind it to the exchange called "my_exchange". (The queue parameter <<>> on queue.bind means 'use the queue last declared on this channel'.)

Each element of the list is either an atom, being the name of an AMQP method, or a tuple with first element the method atom, and second element a property-list of parameter settings.

If just the AMQP method atom is supplied all the parameters take their defaults (as illustrated with 'queue.declare' above).

If a tuple and property-list is supplied, then the properties in the list specify some or all of the parameters explicitly.

Here is another example:

  {'exchange.declare', [ {exchange, <<"my_exchange">>},
                         {type, <<"direct">>},
                         durable
                       ]}

will declare a durable, direct exchange called "my_exchange".

For full details, consult the Erlang Client documentation.

queue

This clause is mandatory. In

  {queue, queue_name}

queue_name is the name of the queue (as a binary string) to shovel messages from. For example:

  {queue, <<"my_work_queue">>}

This queue must exist. Use the resource declarations to create the queue (or ensure it exists) first. If queue_name is <<>> (the empty binary string) the most recently declared queue in declarations is used. This allows anonymous queues to be declared and used.

prefetch_count

This clause is optional. In

  {prefetch_count, count}

count is the maximum number of unacknowledged messages the shovel may hold at a time (a non-negative integer). For example:

  {prefetch_count, 1}

If this number is zero there is no limit. The default is 1000.

AMQP 1.0 source
source_address

This clause is mandatory. It takes the form:

  {source_address, <<"my-address">>}

This represents the source address of the sending AMQP 1.0 link.

prefetch_count

This clause is optional and sets the link credit amount that will be granted to the receiving link. The credit will be automatically renewed when it falls below a 10th of this value. The default is 1000. It takes the form:

  {prefetch_count, 10}

destination

Destination is a mandatory clause and have different properties for different protocols. Two properties are common across all protocols:

  {destination,
      [ {protocol, amqp091 | amqp10},
        {uris, uri_list}
      ]}
AMQP 0.9.1 destination
publish_properties

This clause is optional. It takes the form:

  {publish_properties, property_list}

where the properties in the list are set on the basic.properties of each message before it is re-published.

For example:

  {publish_properties, [ {delivery_mode, 2} ]}

would mark all re-published messages persistent.

By default the properties of the message are preserved, but this clause can be used to change, or set any property, including content_type, content_encoding, headers, delivery_mode, priority, correlation_id, reply_to, expiration, message_id, timestamp, type, user_id, app_id and cluster_id.

add_forward_headers

This clause is optional. It takes the form:

  {add_forward_headers, boolean}

If add_forward_headers is set to true, an x-shovelled header is added or appended to the message before it is re-published.

The default is not to add such a header.

add_timestamp_header

This clause is optional. It takes the form:

  {add_timestamp_header, boolean}

If add_timestamp_header is set to true, an x-shovelled-timestamp header is added to the message before it is re-published. This header value is timestamp (in seconds since epoch) when message had been shovelled.

The default is not to add such a header.

publish_fields

This clause is optional. It takes the form:

  {publish_fields, property_list}

where the properties in the list are used to set the fields on the basic.publish method used to re-publish messages.

By default the messages are re-published using the original exchange name and routing key, for example. By specifying:

  {publish_fields, [ {exchange, <<"my_exchange">>},
                     {routing_key, <<"from_shovel">>}
                   ]}

messages would be re-published to an explicit exchange name with an explicit, fixed routing key.

AMQP 1.0 destination
target_address

This clause is mandatory. It takes the form:

  {target_address, <<"my-address">>}

This represents the target address of the sending AMQP 1.0 link.

properties

This clause is optional. It takes the form:

  {properties, property_list}

This clause is optional and includes any additional properties to be added when re-publishing a message. The available keys include message_id, user_id, to, subject, reply_to, correlation_id, content_type, content_encoding, absolute_expiry_time, creation_time. See the AMQP 1.0 spec (§3.2.4) for the all the available keys and values.

application_properties

This clause is optional. It takes the form:

  {application_properties, property_list}

This declares any additional application properties to be added when re-publishing a message. Keys and values should be binary strings

message_annotations

This clause is optional. It takes the form:

  {message_annotations, property_list}

This declares includes any additional message annotations to be added when re-publishing a message.

add_forward_headers

This clause is optional. It takes the form:

  {add_forward_headers, boolean}

When set to true it will add application properties for the following keys: shovelled-by, shovel-type, shovel-name.

add_timestamp_header

This clause is optional. It takes the form:

  {add_timestamp_header, boolean}

When set to true it will set the creation_time property to the time the message was shovelled.

Example Configuration

A verbose shovel configuration between AMQP 0.9.1 endpoints might look like this:

  {rabbitmq_shovel,
    [ {shovels, [ {my_first_shovel,
                    [ {source,
                        [ {protocol, amqp091},
                          {uris, [ "amqp://fred:[email protected]/my_vhost",
                                   "amqp://john:[email protected]/my_vhost" ]},
                          {declarations, [ {'exchange.declare',
                                              [ {exchange, <<"my_fanout">>},
                                                {type, <<"fanout">>},
                                                durable
                                              ]},
                                           {'queue.declare',
                                              [{arguments,
                                                 [{<<"x-message-ttl">>, long, 60000}]}]},
                                           {'queue.bind',
                                              [ {exchange, <<"my_fanout">>},
                                                {queue,    <<>>}
                                              ]}
                                            ]},
                          {queue, <<>>},
                          {prefetch_count, 10}
                        ]},
                      {destination,
                        [ {protocol, amqp091},
                          {uris, ["amqp://"]},
                          {declarations, [ {'exchange.declare',
                                              [ {exchange, <<"my_direct">>},
                                                {type, <<"direct">>},
                                                durable
                                              ]}
                                         ]},
                         {publish_properties, [ {delivery_mode, 2} ]},
                         {add_forward_headers, true},
                         {publish_fields, [ {exchange, <<"my_direct">>},
                                            {routing_key, <<"from_shovel">>}
                                           ]}
                            ]},
                      {ack_mode, on_confirm},
                      {reconnect_delay, 5}
                    ]}
                ]}
    ]}

The configuration above defines a single shovel called 'my_first_shovel'.

'my_first_shovel' will connect to a broker on either host1 or host2 (as source), and directly to the local broker (as destination). It will reconnect to the other source broker on failure, after a delay of 5 seconds.

When connected to the source it will declare a a direct, fanout exchange called "my_fanout", an anonymous queue with a per-queue message ttl, and bind the queue to the exchange.

When connected to the destination (the local broker) it will declare a durable, direct exchange called "my_direct".

This shovel will re-publish messages sent to the anonymous queue on the source to the local exchange with the fixed routing key "from_shovel". The messages will be persistent and only acknowledged after receiving a publish confirm from the local broker.

The shovel consumer will not be allowed to hold more than ten unacknowledged messages at a time.

Example Configuration (1.0 source - 0.9.1 destination)

A verbose shovel configuration between an AMQP 1.0 source and an AMQP 0.9.1 destination might look like this:

{rabbitmq_shovel,
 [ {shovels, [ {my_first_shovel,
                [ {source,
                   [ {protocol, amqp10,
                      uris, [ "amqp://fred:[email protected]/my_vhost",
                            ]},
                     {source_address, <<"my-source">>},
                     {prefetch_count, 10}
                   ]},
                  {destination,
                     [ {protocol, amqp091},
                       {uris, ["amqp://"]},
                       {declarations, [ {'exchange.declare',
                                         [ {exchange, <<"my_direct">>},
                                           {type, <<"direct">>},
                                           durable
                                         ]}
                                      ]},
                       {publish_properties, [ {delivery_mode, 2} ]},
                       {add_forward_headers, true},
                       {publish_fields, [ {exchange, <<"my_direct">>},
                                          {routing_key, <<"from_shovel">>}
                                        ]}
                     ]},
                  {ack_mode, on_confirm},
                  {reconnect_delay, 5}
                ]}
             ]}
 ]}

Example Configuration (0.9.1 source - 1.0 destination)

A verbose shovel configuration between an AMQP 0.9.1 Source and an AMQP 1.0 destination might look like this:

{rabbitmq_shovel,
 [{shovels, [{my_first_shovel,
              {source,
               [{protocol, amqp091},
                {uris, ["amqp://fred:[email protected]/my_vhost",
                        "amqp://john:[email protected]/my_vhost"]},
                {declarations, [{'exchange.declare',
                                   [{exchange, <<"my_fanout">>},
                                    {type, <<"fanout">>},
                                    durable]},
                                {'queue.declare',
                                   [{arguments,
                                      [{<<"x-message-ttl">>, long, 60000}]}]},
                                {'queue.bind',
                                   [{exchange, <<"my_fanout">>},
                                    {queue,    <<>>}
                                    ]}
                               ]},
                {queue, <<>>},
                {prefetch_count, 10}
               ]},
              {destination,
               [{protocol, amqp10},
                % Note: for plain text SASL authentication
                % {uris, ["amqp://user:[email protected]:5672?sasl=plain"]},
                % Note: for anonymous access
                {uris, ["amqp://host:5672"]},
                {properties, [{user_id, <<"my-user">>}]},
                {application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
                {add_forward_headers, true},
                {target_address, <<"destination-queue">>}
               ]},
              {ack_mode, on_confirm},
              {reconnect_delay, 5}
             }]}
 ]}
}

Getting Help and Providing Feedback

If you have questions about the contents of this guide or any other topic related to RabbitMQ, don't hesitate to ask them on the RabbitMQ mailing list.

Help Us Improve the Docs <3

If you'd like to contribute an improvement to the site, its source is available on GitHub. Simply fork the repository and submit a pull request. Thank you!