RabbitMQ MQTT Adapter

RabbitMQ supports MQTT as of 3.0 (currently targeting version 3.1.1 of the spec).

Supported MQTT 3.1.1 features

MQTT clients can interoperate with other protocols. All the functionality in the management UI and several other clients can be used with MQTT, although there may be some limitations or the need to tweak the defaults.

Enabling the Plugin

The MQTT adapter is included in the RabbitMQ distribution. To enable it, use rabbitmq-plugins:

rabbitmq-plugins enable rabbitmq_mqtt

After the plugin has been enabled, RabbitMQ needs restarting.

How it Works

RabbitMQ MQTT plugin targets MQTT 3.1.1 and supports a broad range of MQTT clients. It also makes it possible for MQTT clients to interoperate with AMQP 0-9-1, AMQP 1.0, and STOMP clients. There is also support for multi-tenancy.

The plugin builds on top of RabbitMQ core protocol's entities: exchanges and queues. Messages published to MQTT topics use a topic exchange (amq.topic by default) internally. Subscribers consume from RabbitMQ queues bound to the topic exchange. This both enables interoperability with other protocols and makes it possible to use the Management plugin to inspect queue sizes, message rates, and so on.

Subscription Durability

MQTT 3.1 assumes two primary usage scenarios:

This section briefly covers how these scenarios map to RabbitMQ queue durability and persistence features.

Transient (QoS0) subscription use non-durable, auto-delete queues that will be deleted when the client disconnects.

Durable (QoS1) subscriptions use durable queues. Whether the queues are auto-deleted is controlled by the client's clean session flag. Clients with clean sessions use auto-deleted queues, others use non-auto-deleted ones.

For transient (QoS0) publishes, the plugin will publish messages as transient (non-persistent). Naturally, for durable (QoS1) publishes, persistent messages will be used internally.

Queues created for MQTT subscribers will have names starting with mqtt-subscription-, one per subscription QoS level. The queues will have queue TTL depending on MQTT plugin configuration, 24 hours by default.

Plugin Configuration

Here is a sample configuration that sets every MQTT option:

[{rabbit,        [{tcp_listeners,    [5672]}]},
 {rabbitmq_mqtt, [{default_user,     <<"guest">>},
                  {default_pass,     <<"guest">>},
                  {allow_anonymous,  true},
                  {vhost,            <<"/">>},
                  {exchange,         <<"amq.topic">>},
                  {subscription_ttl, 1800000},
                  {prefetch,         10},
                  {ssl_listeners,    []},
                  %% Default MQTT with TLS port is 8883
                  %% {ssl_listeners,    [8883]}
                  {tcp_listeners,    [1883]},
                  {tcp_listen_options, [{backlog,   128},
                                        {nodelay,   true}]}]}
].

Authentication

The default_user and default_pass options are used to authenticate the adapter in case MQTT clients provide no login credentials. If the allow_anonymous option is set to false then clients MUST provide credentials. The presence of client-supplied credentials over the network overrides the allow_anonymous option. Colons may not appear in usernames.

The vhost option controls which RabbitMQ vhost the adapter connects to. The vhost configuration is only consulted if no vhost is provided during connection establishment. You can optionally specify a vhost while connecting, by prepending the vhost to the username and separating with a colon.

For example, connecting with

/:guest

is equivalent to the default vhost and username.

mqtt-vhost:mqtt-username

means connecting to the vhost mqtt-host with username mqtt-username.

Host and Port

The tcp_listeners and tcp_listen_options options are interpreted in the same way as the corresponding options in the rabbit section, as explained in the broker configuration documentation.

TLS/SSL

The ssl_listeners option in the rabbitmq_mqtt config section controls the endpoint (if any) that the adapter accepts SSL connections on. The default MQTT SSL port is 8883. If this option is non-empty then the rabbit section of the configuration file must contain an ssl_options entry:

[{rabbit,        [
                  {ssl_options, [{cacertfile, "/path/to/tls/ca/cacert.pem"},
                                 {certfile,   "/path/to/tls/server/cert.pem"},
                                 {keyfile,    "/path/to/tls/server/key.pem"},
                                 {verify,     verify_peer},
                                 {fail_if_no_peer_cert, true}]}
                 ]},
 {rabbitmq_mqtt, [
                  {ssl_listeners,    [8883]}
                  {tcp_listeners,    [1883]}
                  ]}
].

Note that RabbitMQ rejects SSLv3 connections by default because that protocol is known to be compromised.

See the TLS/SSL configuration guide for details.

Authentication with SSL client certificates

The MQTT adapter can authenticate SSL-based connections by extracting a name from the client's SSL certificate, without using a password.

For safety the server must be configured with the SSL options fail_if_no_peer_cert set to true and verify set to verify_peer, to force all SSL clients to have a verifiable client certificate.

To switch this feature on, set ssl_cert_login to true for the rabbitmq_mqtt application. For example:

[
  {rabbitmq_mqtt, [{ssl_cert_login, true}]}
].

By default this will set the username to an RFC4514-ish string form of the certificate's subject's Distinguished Name, similar to that produced by OpenSSL's "-nameopt RFC2253" option.

To use the Common Name instead, add:

{rabbit, [{ssl_cert_login_from, common_name}]}

to your configuration.

Note that:

Session Stickiness (Clean and Non-clean Sessions) and Queue/Subscription TTL

The subscription_ttl option controls the lifetime of non-clean sessions. This option is interpreted in the same way as the queue TTL parameter, so the value 86400000 means 24 hours. To disable the TTL feature, just set the subscription_ttl to undefined in the configuration file:

[{rabbit,        [{tcp_listeners,    [5672]}]},
 {rabbitmq_mqtt, [{default_user,     <<"guest">>},
                  {default_pass,     <<"guest">>},
                  {allow_anonymous,  true},
                  {vhost,            <<"/">>},
                  {exchange,         <<"amq.topic">>},
                  {subscription_ttl, undefined},
                  {prefetch,         10},
                  ...
].

Note that disabling queue TTL carries a risk: short-lived clients that don't use clean sessions can leave queues and messages behind, which will consume resources and require manual cleanup.

The prefetch option controls the maximum number of unacknowledged messages that will be delivered. This option is interpreted in the same way as the AMQP 0-9-1 prefetch-count field, so a value of 0 means "no limit".

Custom Exchanges

The exchange option determines which exchange messages from MQTT clients are published to. If a non-default exchange is chosen then it must be created before clients publish any messages. The exchange is expected to be a topic exchange.

Retained Messages and Stores

The plugin supports retained messages. Message store implementation is pluggable and the plugin ships with two implementation out of the box:

Both implementations have limitations and trade-offs. With the first one, maximum number of messages that can be retained is limited by RAM. With the second one, there is a limit of 2 GB per vhost. Both are node-local (messages retained on one broker node are not replicated to other nodes in the cluster).

To configure the store, use rabbitmq_mqtt.retained_message_store configuration key:

[{rabbitmq_mqtt, [{default_user,     <<"guest">>},
                  {default_pass,     <<"guest">>},
                  {allow_anonymous,  true},
                  {vhost,            <<"/">>},
                  {exchange,         <<"amq.topic">>},
                  {subscription_ttl, 1800000},
                  {prefetch,         10},
                  %% use DETS (disk-based) store for retained messages
                  {retained_message_store, rabbit_mqtt_retained_msg_store_dets},
                  %% only used by DETS store
                  {retained_message_store_dets_sync_interval, 2000},
                  {ssl_listeners,    []},
                  {tcp_listeners,    [1883]}]}
].

The value must be a module that implements the store:

These implementations are suitable for development but sometimes won't be for production needs. MQTT 3.1 specification does not define consistency or replication requirements for retained message stores, therefore RabbitMQ allows for custom ones to meet the consistency and availability needs of a particular environment. For example, stores based on Riak and Cassandra would be suitable for most production environments as those data stores provide tunable consistency.

Message stores must implement the rabbit_mqtt_retained_msg_store behaviour.

Limitations

Overlapping Subscriptions

Overlapping subscriptions from the same client (e.g. /sports/football/epl/# and /sports/football/#) can result in duplicate messages being delivered. Applications need to account for this.

Retained Message Stores

See Retained Messages above. Different retained message stores have different benefits, trade-offs, and limitations.