Menu

Posts Tagged ‘AMQP’

Ruby AMQP Benchmarks

Tuesday, March 1st, 2011

I decided to run some benchmarks of my AMQP encoder/decoder (AMQ Protocol gem) against the old one in the AMQP gem to see whether it performs better or not. So far I did only the most basic optimisations like storing reusable values in constants, nothing special (yet).

I did two sets of benchmarks: CPU time benchmarking using my fork of RBench with support for custom formatters (like writing results into a YAML file) and memory benchmarking using Object.count_objects (Ruby 1.9). (more…)

Ruby AMQP 0.7 released!

Wednesday, January 19th, 2011

I'm happy to announce that the AMQP 0.7 is released, as I promised in the previous blog post. So what are the changes?

AMQP 0.7 gem installation.

When you install the AMQP gem, you'll see changes of the current version. (How did I do that? With changelog gem and a bit of gemspec magic.)

Callback for MQ#queue

Synchronous API for Queue.Declare/Queue.Declare-Ok request/response was exposed via asynchronous callback:

channel = MQ.new
fanout  = channel.fanout(:task_fanout)
channel.queue(:tasks) do |queue, message_count, consumer_count|
  puts "Queue #{queue.name} declared!"
  puts "Message count: #{message_count}"
  puts "Consumer count: #{consumer_count}"
end

Auto-named queues & not rewritting of anonymous entities in MQ#queues and MQ#exchanges

If a queue is declared with an empty name, the broker is supposed to generate random name. In previous versions of the Ruby AMQP this wasn't supported, because the synchronous API (waiting for Queue.Declare-Ok was missing). Not anymore:

channel = MQ.new
channel.queue("") do |queue|
  puts "Queue with name #{queue.name} declared!"
end

# OUTPUT: Queue with name amq.gen-PfCGdyBA4Sr4rkZg3IN3Kw== declared!

The same should apply for exchanges, but this isn't supported by the current version of RabbitMQ.

Also, in the previous AMQP versions, the MQ#queues, MQ#exchanges and similar was just a hash, hence if given entity was anonymous (the name was nil), and if the collection already included another anonymous instance, then the one which was already in the collection was rewritten.

Callback for MQ::Queue#bind

MQ::Queue#bind can take a callback, as well as MQ#queue now can:

channel = MQ.new
fanout  = channel.fanout(:task_fanout)
channel.queue(:tasks).bind(fanout) do |queue|
  puts "Queue #{queue.name} was bound!"
end

AMQP URL

Thanks to majek, author of the Puka AMQP client for Python, you can use URL instead of option hash as an argument for AMQP.connect and AMQP.start:

AMQP.start("amqps:/")
# Will resolve to: {vhost: "/", port: 5671, ssl: true}

AMQP.start("amqp://[email protected]:1111/")
# Will resolve to: {user: "botanicus", vhost: "/", host: "localhost", port: 1111, ssl: false}

MQ::Exchange.default

The default exchange is a direct exchange with an empty name where all the queues are automatically bound (and you can't bind there anything manually). Do not confuse the default exchange with amq.direct which is only a predefined direct exchange without any "magic" abilities).

Fail if an entity is re-declared with different options

Rather than wait for the server, than if possible we let this fail on the client, so the user gets more descriptive error message:

channel = MQ.new
channel.queue(:tasks, auto_delete: true)
channel.queue(:tasks, auto_delete: false)

# Exception: There is already an instance called tasks with options
{:queue => :tasks, :nowait => true, :auto_delete => true},
you can't define the same instance with different options ({:queue => :tasks,
:nowait => true, :auto_delete => false})! (MQ::IncompatibleOptionsError)

Don't reconnect if the credentials are invalid

AMQP reconnects automatically if the connection failed. It did try to reconnect even on an error like providing invalid credentials. I changed it to register the reconnect hook after the connection is actually established, so if for whatever reason the connection fails, it won't try to reconnect.

rSpec 2 specs

This is still work in progress, you can check the spec/ directory. Huge thanks to arvicco and michaelklishin for their work on this!

Issues

We closed nearly all issues at tmm1/amqp repository. Please do not report any further bugs there, use ruby-amqp/amqp instead.

Friendlier environment for contributors

We use bundler now, so if you want to contribute or just run the tests, just clone the repo, run bundle install and voila, that's it! There's also bin/irb for easier debugging.

Speaking about them, I'd really want to thank all the contributors, their work really helped to get the AMQP gem where it is now. Since the beginning 22 people contributed to the project, and 5 of them have more than 5 commits. Check the CONTRIBUTORS file for more details!

Plans for AMQP 0.8

The next 0.8 release will bring some major API changes: there won't be two separate constants MQ and AMQP, but only the second one. The MQ class will become AMQP::Channel, so we will be compliant with the official AMQP terminology and we also want to introduce support for AMQP 0.9.1 via the AMQ-Protocol gem.

Links

Any comments, ideas? You're always welcome to drop by at Jabber MUC [email protected], and tell us what do you think!

What’s Going on with the Ruby AMQP Gem?

Wednesday, January 12th, 2011

In the past year development of the AMQP gem was practicaly stagnating, as its original author Aman Gupta (@tmm1) was busy. A lot of bugs stayed unresolved, the code was getting old and out-dated and no new features or documentation were made.

At this point I started to talk with the RabbitMQ guys about possible collaboration on this. Actually originally I contacted VMware when I saw Ezra Zygmuntowicz looking for people to his cloud team, but when I found that VMware recently acquired the RabbitMQ project in London, I got interested. I signed the contract, switched from script/console to Wireshark and the RabbitMQ Tracer and since November I've been happily hacking on the AMQP and AMQ-Protocol gems.

To introduce myself, my name's Jakub Stastny (@botanicus) and I work as a Ruby contractor. I contributed to such projects as RubyGems, Merb and rSpec and I wrote my own framework called Rango, the only Ruby framework with template inheritance. I work with Node.js as well and I created Minitest.js, BDD framework for testing asynchronous code. My other hobbies are photography and travelling.

I asked Aman if I can take over the maintainership over the AMQP gem and he was happy to do so. At this point other two guys, Michael Klishin (michaelklishin) and Ar Vicco (arvicco) showed interest in the development, so we created ruby-amqp organisation at GitHub and forked the original code there, as well as a few other related repositories. The GitHub guys were happy to make our repository to be the main one, instead of just a fork, so since now, everything will be there (except the old issues which are still on tmm1's fork and which we want to solve and close soon).

Soo What's New?

Test Suite

At the beginning, there were barely any tests at all, so it was basically impossible to tell if the changes I made break something or not. So I started to write some. In the later stage, when michaelklishin and arvicco joined the development, we rewrote the few original Bacon specs to rSpec 2 and now arvicco is porting his specs which he happened to write some time ago to the main repository. Arvicco has also written amqp-spec, superset of em-spec for testing the AMQP gem.

AMQP 0.9.1

Currently the gem speaks only AMQP 0.8, which is more than 2 years old version, so probably the most important upcoming feature is support of AMQP 0.9.1. Because this is something what can be beneficial for other clients as well, I decided to create a new library called AMQ-protocol. It's using rabbitmq-codegen as many others client libraries.

One of the main goals of this gem is to be really fast and memory-efficient (not for the sake of memory-efficiency itself, but because the garbage collector of MRI is quite weak). I'm about to create some benchmarks soon to see if the performance is better and how much.

AMQ-Protocol is still work-in-progress. It works, but it still needs some polishing, refactoring and optimizations, as well as documentation and tests.

Other Changes

I fixed a lot of bugs and I merged all the pending pull requests to the main repository. I'm going to write more about the changes once I'll release AMQP 0.7. I released 0.7.pre recently, you can try it by running gem install amqp --pre, which would be greatly appreciated. As the work on the test suite is still in progress now, the release process is kind of russian roulette at the moment.

Backward compatibility

I fixed quite a few bugs and obviously the fixed code is never backward-compatible with the old buggy one. One of the major changes is that MQ#queues (as well as MQ#fanouts etc) is not a hash anymore, but an array-like collection with hash-like behaviour. It does NOT override anonymous instances when another anonymous instance is created (as it used to do before) and it does support server-generated names. So instead of MQ#queues[nil] = <first instance> and then MQ#queues[nil] = <second instance>) it now just adds both instances to the collection and when it receives Queue.Declare-Ok from the server, it updates the name to it.

Future plans

The AMQP gem is very opinionated. If you don't want to use EventMachine, you're out of luck. You might want to use something more low-level like IO.select or just another async library like cool.io. You might not even want to care about the asynchronous code at all.

It'd be great if we could have one really un-opinionated AMQP client library which only job would be to expose low-level API defined by the AMQP protocol without any abstraction like hidding channels etc. Such library would be intended for another library implementators rather than for the end users. AMQP is a complex protocol and because of some design decisions it's pretty hard to design a good and easy-to-use (opinionated) client library for it. So some basic library which doesn't make any assumptions would help others to play around and try to implement their own, opinionated libraries on top of this one without the need to manually implement the hard stuff like encoding/decoding or basic socket communication.

Questions? Ideas? Get in touch!

Are you interested in the AMQP gem development? Do you want to participate or do you have some questions? Feel free to contact me, either by comments under this blog post, or you can drop me an e-mail to [email protected] or drop by to Jabber MUC room at [email protected] where all the current maintainers usually are. And for all the news make sure you are following me on Twitter!

AMQP 1.0 prototyping

Wednesday, December 1st, 2010

We have been prototyping support for a new protocol, as is our wont. This one is called "AMQP 1.0 R0", and it is the new issue from the AMQP working group (of which RabbitMQ, and latterly VMware, are a member). The "R0" indicates that it's the first revision of a recommendation. The specification is incomplete: there are many TODOs, and to a large extent it is unproven. Those two facts are part of what prompted this prototyping.

The prototype code is mirrored at github: http://github.com/rabbitmq/rabbitmq-amqp1.0. It is built just the same as all our plugins.

The AMQP 1.0 R0 specification differs from the specification of previous versions of AMQP, in that it does not define a broker model; i.e., it doesn't define exchanges queues and bindings, or their equivalents. The protocol is really only about transferring messages from one agent to another, and then agreeing on what the outcome was. That means it is amenable to bolting on to a message broker implementation, among other uses -- the idea is that one can adapt an existing model to suit.

In our case, the incumbent model is that of AMQP 0-9-1, with some generalisations and extensions (for example, chained bindings). Our target with the prototype is therefore to be able to get something useful done with both 1.0 clients and 0-9-1 clients connected at the same time.

Well, the good news is, we've achieved that. In fact the plugin can be set up to replace Rabbit's usual network listener, and will happily talk to AMQP 0-8, 0-9-1, and 1.0 clients. We did have to do some invention along the way, and there are some parts of the specification that we are conspicuously not implementing. These will be detailed in the README soon.

One large part of the invention is to fill in semantics where the specification is silent. Some of these are detailed in this client-broker protocol work we did for the AMQP working group. We're hoping the prototyping will help fill this out some more.

Next week I'll be taking our prototype to the AMQP 1.0 "Connectathon", where it'll be tested against other implementations of the core protocol (not all of which are open source). Again, this will help to flush out barriers to interoperability in the specification.

Well, I’ll let you go … basic.reject in RabbitMQ

Tuesday, August 3rd, 2010

Support for AMQP's basic.reject just landed on default. It's taken this long because we couldn't agree on a single set of semantics that followed the specification, was actually useful, and wasn't too complicated to implement.

First up, this is what RabbitMQ's basic.reject will do: if you supply requeue=false it will discard the message (this is in lieu of dead-lettering it, until we have a dead letter feature); if you supply requeue=true, it will release it back on to the queue, to be delivered again.

The first is useful from a error handling point of view; if your application cannot process a particular message, you can get rid of it. At the minute, it's semantically the same as just acking the message; but, given a dead-letter mechanism, it will mean unprocessable messages can be picked up elsewhere for diagnosis.

The second, with requeue=true, is useful for example if your application needs a "message lock" semantics. In this scenario, a consumer can be delivered a message, then decide not to deal with it after all, and place it back on the queue. Note that RabbitMQ doesn't take care to stop the same consumer getting the message again -- see below.

The AMQP 0-9-1 specification says a number of seemingly incompatible things about basic.reject. For a start, it says in the method description

The client MUST NOT use this method as a means of selecting messages to process.

and in the specification XML (but not in the generated PDF),

The server MUST NOT deliver the message to the same client within the context of the current channel. The recommended strategy is to attempt to deliver the message to an alternative consumer, and if that is not possible, to move the message to a dead-letter queue. The server MAY use more sophisticated tracking to hold the message on the queue and redeliver it to the same client at a later stage.

This seems to suggest that the server has to take care not to deliver the message to the same consumer twice, but consumers are not allowed to rely on this prohibition. This means basic.reject could either redeliver the message or dead-letter it, which makes it useless for the "message lock" scenario given above.

So, we have chosen to implement the simplest thing that is useful, which is to re-enqueue the message and treat it as though it were completely new. This means the consumer can receive again a message it has rejected.