Menu

Puka – rethinking AMQP clients

I fundamentally disagree with the APIs exposed by our current AMQP client libraries.

There is a reason why they’re imperfect: we intentionally avoided innovation in APIs since the beginning. The purpose of our client libraries is to expose generic AMQP, not any one view of messaging. But, in my opinion, trying to map AMQP directly to client libraries APIs is just wrong and results in over-complication and abstractions hard to use.

There is no common ground: the client libraries blindly following AMQP model will be complex; easy to use client libraries must to be opinionated.

1. Channels

The main problem with client libraries following the protocol is caused by the nature of AMQP channels. Channels are often explained as an abstraction matching an operating system thread - you may have many of those, and each one is synchronous.

That's all good, but an AMQP channel is not limited to being a thread - It’s so much more than that: error scope, transaction scope, ordering guarantee and scope for acks.

The programmer may decide to use many channels within a single thread, or the opposite: many threads may need to work on a single channel.

The example of the first situation: forwarding messages between two queues (one channel for publishing, one for consuming). Second situation: splitting work from one channel between multiple worker threads (in order to share basic.qos quota between workers).

Inevitably, an author of a client library must make a decision on the relationship between a channel and a thread. It may sound boring if you’re from .NET/Java background - these frameworks are opinionated about threading. But assuming anything about threading model in a third party library is a very bad practice in some languages, for example C and Python.

We can repeat almost the same discussion for the problem of handling multiple connections. For example a single thread may need to speak to two connections.

Every client library author must answer the following two questions:

  • Is it possible to run multiple synchronous methods, on multiple channels, at the same time?
  • Is it possible to run multiple connections, from a single thread?

Two questions - four possible choices:

Blocking on multiple channels Handling multiple connections from a single thread
no no simple blocking client (pyamqplib)
no yes semi-asynchronous client (pika 0.5.2)
yes no threaded clients (rabbitmq-java, rabbitmq-dotnet)
yes yes fully asynchronous client (puka)

2. Error handling

The next problem is error handling. Using some of the client libraries it’s virtually impossible to catch a AMQP error and recover from it without having to restart the whole program. This is often caused by users not understanding the nature of channels as error scope. But the libraries don't make dealing with errors easy: you get a channel error, now what? For example, doing basic.publish may kill your channel, in theory at any time.

3. Synchronous publish

The last broken thing is the lack of support for synchronous publish. It wasn’t practically possible to make sure a message got delivered to the broker before RabbitMQ extended AMQP to support ‘confirms’. The only solution was to use transactions, which slowed publishing radically. Now, with ‘confirms’ it’s possible but rather hard - as well as writing a callback a user needs to maintain a lock between a library thread and user thread, which requires understanding of the library threading model.

The birth

Out of this frustration a new experimental Python client was born: Puka.

Puka tries to provide simple APIs to the underlying AMQP protocol and reasonable error handling. The major features of Puka:

  • Single threaded. It doesn’t make any assumptions about underlying threading model; the user may write a thin threaded layer on top of Puka if required.
  • It’s possible to mix synchronous and asynchronous programming styles.
  • AMQP Errors are predictable and recoverable.
  • Basic.publish can be synchronous or asynchronous, as you wish.

The anti-features of Puka:

  • AMQP Channels are not exposed to the user.
  • Removed support for some AMQP features, most notably heartbeats.

Code snippets

As a teaser, here are a few code snippets.

Declare 1000 queues, one by one:

for i in range(1000):
    promise = client.queue_declare(queue='a%04i' % i)
    client.wait(promise)

Declare 1000 queues in parallel:

promises = [client.queue_declare(queue='a%04i' % i) for i in range(1000)]
for promise in promises:
    client.wait(promise)

Asynchronous publish:

client.basic_publish(exchange='', routing_key='test',
                     body="Hello world!")

Synchronous publish:

promise = client.basic_publish(exchange='', routing_key='test',
                              body="Hello world!")
client.wait(promise)

AMQP errors don’t affect other parts of your program (publishes, consumes, etc). For example if a ‘test’ queue was already declared as ‘durable’, and you try to redeclare it without a proper flag you’ll get an error:

> promise = client.queue_declare(queue='test')
> client.wait(promise)
Traceback (most recent call last):
[...]
puka.spec_exceptions.PreconditionFailed: {'class_id': 50, 'method_id': 10,
    'reply_code': 406, 'reply_text': "PRECONDITION_FAILED - parameters for queue
    'test' in vhost '/' not equivalent"}

In Puka you may simply catch this exception and continue:

try:
   promise = client.queue_declare(queue='test')
   client.wait(promise)
except puka.PreconditionFailed:
    # Oh, sorry. Forgot it was durable.
   promise = client.queue_declare(queue='test', durable=True)
   client.wait(promise)

You may take a look at Puka code for RabbitMQ tutorials and Puka examples and tests.

Summary

In summary, Puka provides a simpler APIs, flexible programming model, proper error handling and doesn’t make any decisions on threading. It makes using AMQP fun again.

8 Responses to “Puka – rethinking AMQP clients”

  1. Brave Sir Robin Says:

    Interesting stuff!

    One question I've got is how do you preserve / promote the interoperability of client libraries' "high level" commands? One idea I had was to write "helper classes" to do basic messaging tasks, e.g. RPC. You'd have an RPCClient and RPCConsumer class which presented a simple, RPC focused API that behind the scenes used Amqp to do it's work. This is all well and good if you're only using one library, but if you want the producer and consumer to be implemented on different platforms you're stuck with rewriting one of the implementation classes.

  2. marek Says:

    > One question I’ve got is how do you preserve / promote the
    > interoperability of client libraries’ “high level” commands? One
    > idea I had was to write “helper classes” to do basic messaging
    > tasks, e.g. RPC. You’d have an RPCClient and RPCConsumer class
    > which presented a simple, RPC focused API that behind the
    > scenes used Amqp to do it’s work.

    RPC is actually quite tricky, every single application has different requirements (error handling and recovery requirements are different in every application). Building RPC in the client library is almost never a good idea.

    Puka is still exposes only AMQP, there aren't any message-pattern like abstractions. But Puka does hide AMQP channels, if for some reason your specially crafted RPC or other high abstraction requires unusual setup of channels, you probably won't get it working using Puka. I'm not aware of any higher messaging abstraction like that.

  3. Tony Garnock-Jones Says:

    I really like the use of Promises. Had you considered supporting "promise.wait()" or similar? What happens if the connection drops and is reestablished (or something similar) before the promise is waited on? What happens with broken promises? Could you integrate promises more generally into python?

  4. marek Says:

    > I really like the use of Promises.

    Thanks :)

    > Had you considered supporting “promise.wait()” or similar?

    You hit the bullseye. Yes I have considered it, and there is a good reason I chose not to, although you may call my arguments aesthetic.

    In my opinion "promise.wait()" suggests that a promise is a thing you wait on, which is true only as a high abstraction. What you wait, in fact, is a socket descriptor. Thus: "client.wait()" makes more sense: 'client' is actually a thin abstraction on top of a descriptor. It also makes learning curve easier: how would you wait for two promises at the same time? "client.wait([])" is an obvious thing, with "promise.wait" it would be messy.

    But in fact all the "waiting" forms a hierarchy: reactor.wait([clients]), client.wait([promises]), promise.wait(nothing).

    IMO: client.wait() is easier to understand, and easier to extend if you decides you have to do multiple client.waits at the same time.

    >What happens if the connection drops and is reestablished
    > (or something similar) before the promise is waited on?

    Connection reestablishment is not done, I don't know how to expose it to the user. With current Puka API it would be quite easy to just repeat promises that were interrupted by a broken connection, for example if 'basic.publish' fails - just do it again after reconnection. But AMQP doesn't really allow to repeat everything - what to do with missing acks? how to deal with auto-delete/exclusive resources? (all that requires the knowledge of application)

    I'm afraid with AMQP user needs to do connection reestablishment manually. (alternatively we could track the state of everything in Puka, and recreate resources on connection, ugly)

    >What happens with broken promises?

    You get a promise which result is an error. By default "client.wait()" raises this error, but you may choose not to, by doing "client.wait(raise_errors=False)". As simple as that - error is a proper return value.

    >Could you integrate promises more generally into python?

    What do you mean?

  5. Tony Garnock-Jones Says:

    Great answers, thanks Marek.

    Could you integrate promises more generally into python?

    What do you mean?

    I was curious about whether such a promise could be used for more general synchronisation after completion of some computation, i.e. integration with threads etc. Having looked at the code, I see that they're specific to AMQP operations. Designing a general-purpose Promise library for Python seems like a daunting task, so having AMQP-specific promises seems like the right thing for now :-)

  6. John Krukoff Says:

    Re: integrate promises

    Have you looked at the now stdlib included python futures module?

    http://www.python.org/dev/peps/pep-3148/

    I don't know how much more you could ask for to have them more closely integrated.

  7. Marek Majkowski Says:

    Very interesting indeed! Have you used it? Is it only in python 3? Is there a backport for python 2? I've raised Puka issue for that: https://github.com/majek/puka/issues/6

  8. Jenny DeLato Says:

    We were wondering how many different applications are using Puka? we tried looking at the examples on the main documentation page and the examples all seem very simplified.

    How many projects out there are using it for production uses? Can anyone give me some detailed examples that show perhaps how I would go about setting up 3 separate producers (different processes) and then one module that has 3 consumers (1 for each producer). Our goal is to have a work queue, a management queue, and perhaps a topic .

    Additionally, how does a topic work in Puka?

    thank you