
This page gives an overview of the RabbitMQ Java client API.
The code samples given here demonstrate connecting to AMQP brokers and using RPC services exposed via AMQP.
For more details, please see the relevant Javadoc documentation.
The client API is closely modelled on the AMQP protocol specification, with little additional abstraction.
For more detail on the classes used in this document, please see the Javadoc documentation.
The core API classes are Connection
and Channel, representing an AMQP connection and an
AMQP data channel, respectively:
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
The holder class AMQP stores all the code
generated automatically from the AMQP XML protocol
definition specification. It contains all required
content-class-specific content header definitions (such
as AMQP.BasicProperties) and all the
method request and response descriptors (such
as AMQP.Basic.Publish
and AMQP.Queue.BindOk), as well as useful
protocol-specific constants and other values.
For details and exact definitions, please see the AMQP specification document.
import com.rabbitmq.client.AMQP;
The following code connects to an AMQP broker using the given parameters (host name, port number, etc) :
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHostName(hostName); factory.setPortNumber(portNumber); Connection conn = factory.newConnection();
All of these parameters have sensible defaults for a stock RabbitMQ server running locally.
The Connection interface can then be used to open a channel:
Channel channel = conn.createChannel();
The channel can now be used to send and receive messages, as described in subsequent sections.
To disconnect, simply close the channel and the connection:
channel.close(); conn.close();
Note that closing the channel may be considered good practice, but isn't strictly necessary here - it will be done automatically anyway when the underlying connection is closed.
Client applications work with exchanges and queues, the high-level building blocks of AMQP. These must be "declared" before they can be used. Declaring either type of object simply ensures that one of that name exists, creating it if necessary.
Continuing the previous example, the following code declares an exchange and a queue, then binds them together.
channel.exchangeDeclare(exchangeName, "direct", true); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, routingKey);
This will actively declare the following objects, both of which can be customised by using additional parameters. Here neither of them have any special arguments.
The above function calls then bind the queue to the exchange with the given routing key.
Note that this would be a typical way to declare a queue when only one client wants to work with it: it doesn't need a well-known name, no other client can use it (exclusive) and will be cleaned up automatically (autodelete). If several clients want to share a queue with a well-known name, this code would be appropriate:
channel.exchangeDeclare(exchangeName, "direct", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey);
This will actively declare:
Note that all of these Channel API methods are overloaded.
These convenient short forms of exchangeDeclare, queueDeclare and queueBind
use sensible defaults. There are also longer forms with more parameters, to let you override these defaults
as necessary, giving full control where needed.
This "short version, long version" pattern is used throughout the client API uses.
To publish a message to an exchange, use Channel.basicPublish as follows:
byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
For fine control, you can use overloaded variants to specify the mandatory and immediate
flags, or send messages with basic-class header properties :
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
This sends a message with delivery mode 2 (persistent) and content-type "text/plain". You can specify as many parameters as you like :
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties
(contentType, contentEncoding, headers, deliveryMode,
priority, correlationId, replyTo, expiration,
messageId, timestamp, type, userId,
appId, clusterId),
messageBodyBytes);
Here any or all of the parameters to the BasicProperties constructor may be null.
Note also that BasicProperties is an inner class of the autogenerated
holder class AMQP.
In general, Channel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for Channel instances. If more than one thread needs to access a particular Channel instance, the application should enforce mutual exclusion itself, for example by synchronising on the Channel.
Symptoms of incorrect serialisation of Channel operations include, but are not limited to,
IllegalStateExceptions with the message "cannot execute more than one synchronous AMQP command at a time", and UnexpectedFrameErrors.
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.QueueingConsumer;
The most efficient way to receive messages is to set up a
subscription using the Consumer
interface. The messages will then be delivered
automatically as they arrive, rather than having to be
requested proactively.
The easiest and safest supplied implementation of
Consumer is the QueueingConsumer
convenience class:
boolean noAck = false;
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, noAck, consumer);
while (/* decide whether to continue reading */) {
QueueingConsumer.Delivery delivery;
try {
delivery = consumer.nextDelivery();
} catch (InterruptedException ie) {
continue;
}
// (process the message components ...)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
If, instead, you decide to implement the
Consumer interface yourself, be warned that
its methods will be called from within the connection's
thread, and so they must not in turn invoke any blocking
AMQP operation (such as queueDeclare,
txCommit, basicCancel or
basicPublish). If they do, the channel will
deadlock. Only asynchronous AMQP operations without content
are safe for use within callbacks, such as basicAck.
It's partly because of this complication that using
QueueingConsumer is the safest way of
receiving messages by subscription.
If, with the above caveats in mind, you do decide to
implement the Consumer interface yourself,
one good option is to subclass the convenience class
DefaultConsumer, overriding methods as
necessary. You will generally want to override the core
interface method handleDelivery:
boolean noAck = false;
channel.basicConsume(ticket, queueName, noAck,
new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components ...)
channel.basicAck(deliveryTag, false); // asynchronous, thus safe!
}
});
More sophisticated consumers will need to override further
methods. In particular, handleShutdownSignal
traps channel / connection closure, and
handleConsumeOk is passed a server-generated
consumer tag when none is supplied to the initial
basicConsume call.
Consumers can also implement the
handleCancelOk method to be notified of
cancellations.
You can cancel an active consumer with
Channel.basicCancel:
channel.basicCancel(consumerTag);
When calling the API methods relating to
Consumers, individual subscriptions are
always referred to by their consumer tags, which can be
either client- or server-generated as explained in the AMQP specification
document.
To retrieve individual messages, use
Channel.basicGet. The returned value is an
instance of GetResponse, from which the
header information (properties) and message body can be
extracted :
boolean noAck = false;
GetResponse response = channel.basicGet(queueName, noAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
Since the noAck = false above,
you must also call Channel.basicAck to
acknowledge that you have successfully received and
processed the message :
...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}
If a message is published with the "mandatory" or "immediate" flags set, but cannot be
delivered, the broker will return it to the sending client
(via a AMQP.Basic.Return command).
To be notified of such returns, clients can implement the ReturnListener
interface and calling Channel.setReturnListener.
If the client has not configured a return listener for a particular channel,
then the associated returned messages will be silently dropped.
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});
This return listener will be called, for example, if the client publishes a message with the "mandatory" flag set to an exchange of "direct" type which is not bound to a queue.
As a programming convenience, the Java client API offers a
class RpcClient which uses a temporary reply
queue to provide simple RPC-style communication facilities via AMQP.
The class doesn't impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.
import com.rabbitmq.client.RpcClient; RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);
(The implementation details of how this class uses AMQP are as follows: request messages are sent with the
basic.correlation_id field set to a value unique for this RpcClient instance,
and with basic.reply_to set to the name of the reply queue.)
Once you have created an instance of this class, you can use it to send RPC requests by using any of the following methods:
byte[] primitiveCall(byte[] message); String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)
The primitiveCall method transfers raw byte arrays as the request and response
bodies. The method stringCall is a thin
convenience wrapper around primitiveCall,
treating the message bodies as String instances
in the default character encoding.
The mapCall variants are a little more sophisticated: they encode
a java.util.Map containing ordinary Java values
into an AMQP binary table representation, and decode the
response in the same way. (Note that there are some restrictions on what value
types can be used here - see the javadoc for details.)
All the marshalling/unmarshalling convenience methods use primitiveCall as a
transport mechanism, and just provide a wrapping layer on top of it.
The AMQP connection and channel share the same general approach to managing network failure, internal failure, and explicit local shutdown.
The AMQP connection and channel have the following lifecycle states:
open: the object is ready to use
closing: the object has been explicitly
notified to shut down locally, has issued a shutdown
request to any supporting lower-layer objects, and is
waiting for their shutdown procedures to complete
closed: the object has received all
shutdown-complete notification(s) from any lower-layer
objects, and as a consequence has shut itself down
Those objects always end up in the closed state, regardless of the reason that casued the closure, like an application request, an internal client library failure, a remote network request or network failure.
The AMQP connection and channel objects possess the following shutdown-related methods:
addShutdownListener(ShutdownListener
listener) and
removeShutdownListener(ShutdownListener
listener), to manage any listeners, which will
be fired when the object transitions to
closed state. Note that, adding a
ShutdownListener to an object that is already closed
will fire the listener immediately
getCloseReason(), to allow the
investigation of what was the reason of the object's
shutdown
isOpen(), useful for testing whether the
object is in an open state
close(int closeCode, String
closeMessage), to explictly notify the object
to shut down
Simple usage of listeners would look like:
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});
One can retrieve the
ShutdownSignalException, which contains all
the information available about the close reason, either
by explictly calling the getCloseReason()
method or by using the cause parameter in
the service(ShutdownSignalException cause)
method of the ShutdownListener class.
The ShutdownSignalException class provides
methods to analyze the reason of the shutdown. By
calling the isHardError() method we get
information whether it was a connection or a channel
error.
public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Object reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}
Use of the isOpen() method of channel and
connection objects is not recommended for production
code, because the value returned by the method is
dependent on the existence of the shutdown cause. The
following code illustrates the possibility of race
conditions:
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and txCommit() call
...
channel.txCommit();
}
}
Instead, we should normally ignore such checking, and
simply attempt the action desired. If during the
execution of the code the channel of the connection is
closed, a ShutdownSignalException will be
thrown indicating that the object is in an invalid
state. We should also catch for IOException
caused either by SocketException, when
broker closes the connection unexpectedly, or
ShutdownSignalException, when broker
initiated clean close.
public void validMethod(Channel channel)
{
try {
...
channel.txCommit();
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}