Erlang AMQP Client library

This is the programmer's guide to the Erlang AMQP client.

The AMQP client provides an Erlang interface to compliant AMQP brokers. The client follows the AMQP execution model and implements the wire level marshaling required to encode and decode AMQP commands in a protocol conformant fashion. AMQP is a connection orientated protocol and multiplexes parallel interactions via multiple channels within a connection.

This user guide assumes that the reader is familiar with basic concepts of AMQP and understands exchanges, queues and bindings. This information is contained in the protocol documentation on the AMQP website. For details and exact definitions, please see the AMQP specification document.

The basic usage of the client follows these broad steps:

  1. Establish a connection to a broker
  2. Create a new channel within the open connection
  3. Execute AMQP commands with a channel such as sending and receiving messages, creating exchanges and queue or defining routing rules between exchanges and queues
  4. When no longer required, close the channel and the connection

Obtain the library source and related materials from the download directory.

Programming Model

The main two modules in the client library are:

  1. amqp_connection, which is used to open connections to a broker and create channels
  2. amqp_channel, which is used to send and receive AMQP commands
Once a connection has been established, and a channel has been opened, an application will typically use the amqp_channel:call/{2,3} and amqp_channel:cast/{2,3} functions to achieve most of the things it needs to do.

The library is made up of two layers:

  1. A high level logical layer that follows the AMQP execution model
  2. A low level driver layer that is responsible for providing a physical transport to a broker
There are two drivers in the client library:
  1. The network driver establishes a TCP connection to a protocol compliant AMQP broker and encodes each command according to the specification. To use this driver, start a connection using amqp_connection:start/1 with the parameter set to an #amqp_params_network record.
  2. The direct driver uses native Erlang messaging instead of sending AMQP encoded commands over TCP. This approach avoids the overhead of marshaling commands onto and off the wire. However, the direct driver can only be used in conjunction with the RabbitMQ broker and the client code must be deployed into the same Erlang cluster. To use the direct driver, start a connection using amqp_connection:start/1 with the parameter set to an #amqp_params_direct record.

At run-time, the client library re-uses a subset of the functionality from the RabbitMQ broker. In order to keep the a client deployment independent of RabbitMQ, the client build process produces an archive containing all of the common modules. This archive is then put onto the load path of the client application.

For more detailed information on the API, please refer to the reference documentation.

Furthermore, the test suite that is part of the source distribution of the client library contains many complete examples of how to program against the API.

AMQP Commands

The general mechanism of interacting with the broker is to send and receive AMQP commands that are defined in the protocol documentation. During build process, the machine readable version of the AMQP specification is used to auto-generate Erlang records for each command. The code generation process also defines sensible default values for each command. Using default values allows the programmer to write terser code - it is only necessary to override a field if you require non-default behaviour. The definition of each command can be consulted in the rabbit_framing.hrl header file. For example, when using the #'exchange.declare'{} command, specifying the following:

      #'exchange.declare'{exchange = <<"my_exchange">>}
    
is equivalent to this:
      #'exchange.declare'{exchange    = <<"my_exchange">>,
                          ticket      = 0,
                          type        = <<"direct">>,
                          passive     = false,
                          durable     = false,
                          auto_delete = false,
                          internal    = false,
                          nowait      = false,
                          arguments   = []}
    

Including Header Files

The Erlang client uses a number of record definitions which you will encounter in this guide. These records fall into two broad categories:

  1. Auto-generated AMQP command definitions from the machine readable version of the specification
  2. Definitions of data structures that are commonly used throughout the client
To gain access to these records, you need to include the amqp_client.hrl in every module that uses the Erlang client:
      -include("amqp_client.hrl").
    

Connecting To A Broker

The amqp_connection module is used to start a connection to the broker:

       {ok, Connection} = amqp_connection:start(#amqp_params_network{})
     

This function returns {ok, Connection}, where Connection is the pid of a process that maintains a permanent connection to the broker.

In case of an error, the above call returns {error, Error}.

An AMQP broker contains objects organised into groups called virtual hosts. The concept of virtual hosts gives an administrator the facility to partition a broker resource into separate domains and restrict access to the objects contained within these groups. AMQP connections require client authentication and the authorisation to access specific virtual hosts.

The #amqp_params_network record sets the following default values:

ParameterDefault Value
usernameguest
passwordguest
virtual_host/
hostlocalhost
port5672
channel_max0
frame_max0
heartbeat0
ssl_optionsnone
auth_mechanisms[fun amqp_auth_mechanisms:plain/3, fun amqp_auth_mechanisms:amqplain/3]
client_properties[ ]

These values are only the defaults that will work with an out of the box broker running on the same host. If the broker or the environment has been configured differently, these values can be overridden to match the actual deployment scenario.

SSL options can also be specified globally using the ssl_options environment key for the amqp_client application. They will be merged with the SSL parameters from the URI (the latter will take precedence).

If a client wishes to run inside the same Erlang cluster as the RabbitMQ broker, it can start a direct connection that optimises away the AMQP codec. To start a direct connection, use amqp_connection:start/1 with the parameter set to an #amqp_params_direct record.

Providing a username and password is optional, since the direct client is considered trusted anyway. If a username and password are provided then they will be checked and made available to authentication backends. If a username is supplied, but no password, then the user is considered trusted and logged in unconditionally. If neither username nor password are provided then the connection will be considered to be from a "dummy" user which can connect to any virtual host and issue any AMQP command.

The #amqp_params_direct record sets the following default values:

ParameterDefault Value
usernamenone
passwordnone
virtual_host/
nodenode()
client_properties[ ]

Connecting To A Broker with AMQP URIs

Instead of working the #amqp_params_* records directly, AMQP URIs may be used. The amqp_uri:parse/1 function is provided for this purpose. It parses an URI and returns the equivalent #amqp_params_* record. Diverging from the spec, if the hostname is omitted, the connection is assumed to be direct and an #amqp_params_direct{} record is returned. In addition to the standard host, port, user, password and vhost parameters, extra parameters may be specified via the query string (e.g. "?heartbeat=5").

Creating Channels

Once a connection to the broker has been established, the amqp_connection module can be used to create channels:

      {ok, Channel} = amqp_connection:open_channel(Connection)
    
This function takes the pid of the connection process and returns {ok, Channel}, where Channel is a pid that encapsulates a server side channel.

Managing Exchanges And Queues

Once a channel has been established, the amqp_channel module can be used to manage the fundamental objects within AMQP, namely exchanges and queues. The following function creates an exchange called my_exchange, which by default, is the direct exchange:

      Declare = #'exchange.declare'{exchange = <<"my_exchange">>},
      #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)
    
Similarly, a queue called my_queue is created by this code:
      Declare = #'queue.declare'{queue = <<"my_queue">>},
      #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)
    
In many scenarios, a client is not interested in the actual name of the queue it wishes to receive messages from. In this case, it is possible to let the broker generate a random name for a queue. To do this, send a #'queue.declare'{} command and leave the queue attribute undefined:
      #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{})
    
The server will auto-generate a queue name and return this name as part of the acknowledgement.

To create a routing rule from an exchange to a queue, the #'queue.bind'{} command is used:

      Binding = #'queue.bind'{queue       = Queue,
                              exchange    = Exchange,
                              routing_key = RoutingKey},
      #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)
    
When this routing rule is no longer required, this route can be deleted using the #'queue.unbind'{} command:
      Binding = #'queue.unbind'{queue       = Queue,
                                exchange    = Exchange,
                                routing_key = RoutingKey},
      #'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
    
An exchange can be deleted by the #'exchange.delete'{} command:
      Delete = #'exchange.delete'{exchange = <<"my_exchange">>},
      #'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete)
    
Similarly, a queue is deleted using the #'queue.delete'{} command:
      Delete = #'queue.delete'{queue = <<"my_queue">>},
      #'queue.delete_ok'{} = amqp_channel:call(Channel, Delete)
    

Note that we used amqp_channel:call/2 in the examples above, since we sent AMQP synchronous methods. It is generally advisable to use amqp_channel:call/{2,3} for synchronous methods, rather than amqp_channel:cast/{2,3}, even though both functions work with both sync and async method types. The difference between the two functions is that amqp_channel:call/{2,3} blocks the calling process until the reply comes back from the server (for sync methods) or the method has been sent on the wire (for async methods), whereas amqp_channel:cast/{2,3} returns 'ok' immediately. Thus, only by using amqp_channel:call/{2,3} can we check that the broker has acknowledged our command.

Sending Messages

To send a message to an exchange with a particular routing key, the #'basic.publish'{} command in conjunction with the #amqp_msg{} record is used:

      Payload = <<"foobar">>,
      Publish = #'basic.publish'{exchange = X, routing_key = Key},
      amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload})
    
By default, the properties field of the #amqp_msg{} record contains a minimal implementation of the #'P_basic'{} properties structure. If an application needs to override any of the defaults, for example, to send persistent messages, the #amqp_msg{} needs to be constructed accordingly:
      Payload = <<"foobar">>,
      Publish = #'basic.publish'{exchange = X, routing_key = Key},
      Props = #'P_basic'{delivery_mode = 2}, %% persistent message
      Msg = #amqp_msg{props = Props, payload = Payload},
      amqp_channel:cast(Channel, Publish, Msg)
    
The full list of message headers is explained in the AMQP protocol documentation.

Remember that the AMQP #'basic.publish' command is asynchronous. This means that the server will not send a response to it, unless the message is not deliverable. In this case, the message will be returned to the client. This operation is described in the handling returns section.

Receiving Messages

The simplest way to receive a message is to poll an existing queue. This is achieved using the #'basic.get'{} command:

      Get = #'basic.get'{queue = Q, no_ack = true},
      {#'basic.get_ok'{}, Content} = amqp_channel:call(Channel, Get),
      #amqp_msg{payload = Payload} = Content
    
The payload that is returned is an Erlang binary, and it is up to the application to decode it, as the structure of this content is opaque to the AMQP protocol.

If the queue were empty when the #'basic.get'{} command was invoked, then the channel will return an #'basic.get_empty' message, as illustrated here:

     #'basic.get_empty'{} = amqp_channel:call(Channel, Get)
    

Note that the previous example sets the no_ack flag on the #'basic.get'{} command. This tells the broker that the receiver will not send an acknowledgement of the message. In doing so, the broker can absolve itself of the responsibility for delivery - once it believes it has delivered a message, then it is free to assume that consuming application has taken responsibility for it. In general, a lot of applications will not want these semantics, rather, they will want to explicitly acknowledge the receipt of a message. This is done with the #'basic.ack'{} command, where the no_ack field is turned off by default:

      Get = #'basic.get'{queue = Q},
      {#'basic.get_ok'{delivery_tag = Tag}, Content}
          = amqp_channel:call(Channel, Get),
      %% Do something with the message payload.......and then ack it
      amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})
    

Notice that we sent the #'basic.ack'{} command using amqp_channel:cast/2 instead of amqp_channel:call/2. This is because the broker will not send a response to an acknowledgement, i.e. it is a fire and forget command.

Receiving messages by polling a queue is not as as efficient as subscribing a consumer to a queue, so consideration should be taken when receiving large volumes of messages.

Subscribing To Queues

As indicated in the "Receiving Messages" section, subscribing to a queue can be a more efficient means of consuming messages than the polling mechanism. To subscribe to a queue, the #'basic.consume'{} command is used in one of two forms:

      Sub = #'basic.consume'{queue = Q},
      #'basic.consume_ok'{consumer_tag = Tag} =
        amqp_channel:subscribe(Channel, Sub, Consumer)
    
or
      Sub = #'basic.consume'{queue = Q},
      #'basic.consume_ok'{consumer_tag = Tag} =
        amqp_channel:call(Channel, Sub) %% the caller is the subscriber
    

The consumer argument is the pid of a process to which the client library will deliver messages. This can be an arbitrary Erlang process, including the process that initiated the subscription. The #'basic.consume_ok'{} notification contains a tag that identifies the subscription. This is used at a later point in time to cancel the subscription. This notification is sent both to the process that created the subscription (as the return value to amqp_channel:subscribe/3) and as a message to the consumer process.

When a consumer process is subscribed to a queue, it will receive messages in its mailbox. An example receive loop looks like this:

      loop(Channel) ->
          receive
              %% This is the first message received
              #'basic.consume_ok'{} ->
                  loop(Channel);

              %% This is received when the subscription is cancelled
              #'basic.cancel_ok'{} ->
                  ok;

              %% A delivery
              {#'basic.deliver'{delivery_tag = Tag}, Content} ->
                  %% Do something with the message payload
                  %% (some work here)

                  %% Ack the message
                  amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

                  %% Loop
                  loop(Channel)
          end.
     
In this simple example, the process consumes the subscription notification and then proceeds to wait for delivery messages to arrive in its mailbox. When messages are received from the mailbox, the loop does something useful with the message and sends a receipt acknowledge back to the broker. If the subscription is cancelled, either by the consumer itself or some other process, a cancellation notification will be sent to the consumer process. In this scenario, the receive loop just exits. If the application does not wish to explicitly acknowledge message receipts, it should set the no_ack flag on the subscription request.

To cancel a subscription, use the tag that the broker passed back with the #'basic.consume_ok'{} acknowledgement:
      amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag})
    

Subscribing Internals

The channel uses a module implementing the amqp_gen_consumer behaviour to determine its behaviour with regard to subscribing related events. Effectively, this modules handles client-side consumer registration and routing of deliveries to the appropriate consumers.

For instance, the default consumer module, amqp_selective_consumer, keeps track of which processes are subscribed to which queues and routes deliveries appropriately; in addition, if the channel gives it a delivery for an unknown consumer, it will pass it to a default consumer, should one be registered.

By contrast, amqp_direct_consumer simply forwards all the messages it receives from the channel to its only registered consumer.

The consumer module for a channel is chosen when the channel is opened by setting the second parameter to amqp_connection:open_channel/2. The consumer module implements the amqp_gen_consumer behaviour and thus implements functions to handle receiving basic.consume, basic.consume_ok, basic.cancel, basic.cancel_ok methods as well as publishes.

See the API documentation for details.

Closing Channels And The Connection

When a channel is no longer required, a client should close it. This is achieved using amqp_channel:close/1:

        amqp_channel:close(Channel)
      
To close the connection, amqp_connection:close/1 is used:
        amqp_connection:close(Connection)
      

Both the #'channel.close'{} and #'connection.close'{} commands take the arguments reply_code (an integer) and reply_text (binary text), which can be set by the client depending on the reason why the channel or connection is being closed. In general, however, the reply_code is set to 200 to indicate a normal shutdown. The reply_text attribute is just an arbitrary string, that the server may or may not log. If a client wants to set to a different reply code and/or text, it can use the overloaded functions amqp_channel:close/3 and amqp_connection:close/3 respectively.

Complete Example

This shows a complete example:

      -module(amqp_example).

      -include("amqp_client.hrl").

      -compile([export_all]).

      test() ->
          %% Start a network connection
          {ok, Connection} = amqp_connection:start(#amqp_params_network{}),
          %% Open a channel on the connection
          {ok, Channel} = amqp_connection:open_channel(Connection),

          %% Declare a queue
          #'queue.declare_ok'{queue = Q}
              = amqp_channel:call(Channel, #'queue.declare'{}),

          %% Publish a message
          Payload = <<"foobar">>,
          Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
          amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),

          %% Get the message back from the queue
          Get = #'basic.get'{queue = Q},
          {#'basic.get_ok'{delivery_tag = Tag}, Content}
               = amqp_channel:call(Channel, Get),

          %% Do something with the message payload
          %% (some work here)

          %% Ack the message
          amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

          %% Close the channel
          amqp_channel:close(Channel),
          %% Close the connection
          amqp_connection:close(Connection),

          ok.
    
In this example, a queue is created with a server generated name and a message is published directly to the queue. This makes use of the fact that every queue is bound to the default exchange via its own queue name. The message is then dequeued and acknowledged.

Client Deployment

The client build process produces two deployment archives:

  1. amqp_client.ez, which contains all of the AMQP client modules
  2. rabbit_common.ez, which contains the common modules from the server that are required at run-time
In order to use the client at run-time, you need to put both these archives onto the load path of your application. This can be done either by setting the ERL_LIBS variable to point at the directory that contains these archives or by referencing their containing directory with the -pa argument to the Erlang interpreter.

Say you were to put both amqp_client.ez and rabbit_common.ez into a directory called deps. To compile the example, you first need to unpack the archives in the deps directory:

      $ unzip -d deps deps/amqp_client.ez
      $ unzip -d deps deps/rabbit_common.ez
    
Then to compile the example code into the ebin directory:
      $ ERL_LIBS=deps erlc -o ebin amqp_example.erl
    
And then to run your application you could set the Erlang run-time like this:
      $ ERL_LIBS=deps erl -pa ebin
      Erlang R16B (erts-5.10.1) [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

      Eshell V5.10.1  (abort with ^G)
      1> amqp_example:test().
      ok
      2>
    

Delivery Flow Control

By default, there is no flow control within a channel other than normal TCP back-pressure. A consumer can set the size of the prefetch buffer that the broker will maintain for outstanding unacknowledged messages on a single channel. This is achieved using the #'basic.qos'{} command:

      amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})
    
Applications typically should set the prefetch count, which means the processing speed of the consumer will exert back-pressure on the flow of messages in that channel.

Blocked Connections

When an AMQP broker is running low on resources, for example by hitting a memory high watermark, it may choose to stop reading from publishers' network sockets.

RabbitMQ introduces a mechanism to allow clients to be told this has taken place - invoke amqp_connection:register_blocked_handler/2 giving the pid of a process to which #'connection.blocked'{} and #'connection.unblocked'{} messages may be sent.

Handling Returned Messages

The broker will return undeliverable messages back to the originating client. These are messages published either with the immediate or mandatory flags set. In order for the application to get notified of a return, it must register a callback process that can process #'basic.return'{} commands. Here is an example of unrouteable message:

       amqp_channel:register_return_handler(Channel, self()),
       amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
       Publish = #'basic.publish'{exchange = X, routing_key = SomeKey,
                                  mandatory = true},
       amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
       receive
           {BasicReturn, Content} ->
               #'basic.return'{reply_text = <<"unroutable">>, exchange = X} = BasicReturn
               %% Do something with the returned message
       end