Posterous theme by Cory Watilo

Filed under: RabbitMQ

Homebrew and RabbitMQ Plugins

Since there is no plugin option in Homebrew's install of RabbitMQ one might have a hard time finding how to install RabbitMQ plugins. Assuming you used the default location of /usr/local for your homebrew setup, you only need to copy the .ez files for the plugins to the following directory:

/usr/local/Cellar/rabbitmq/2.4.1/lib/rabbitmq/erlang/lib/rabbitmq-2.4.1/plugins

In my case, I downloaded the plugins to my Downloads folder (~/Downloads) and issued the following command:

mv ~/Downloads/*ez /usr/local/Cellar/rabbitmq/2.4.1/lib/rabbitmq/erlang/lib/rabbitmq-2.4.1/plugins

Once you’ve moved the files there, you’ll need to restart the RabbitMQ server to have them loaded. If you used the launchctl instructions during the RabbitMQ brew install for starting and adding the service, you should be able to issue two commands in terminal:

launchctl stop com.rabbitmq.rabbitmq-server
launchctl start com.rabbitmq.rabbitmq-server

That’s all there is to it. If you installed the Management plugin, you can now test by going to http://localhost:55672.

Deeper down the rabbit hole of message (re)delivery

I've posted in the past about RabbitMQ and how to get started with it.  I thought I'd follow up with something that's a little deeper, but still pretty entry level from a usage perspective. I've been spending a lot of time with RabbitMQ lately, as I'm now the active maintainer of Pika, the python package for interfacing with it. Part of my diving deep into Pika has been becoming better versed in the AMQP standard.

AMQP or the Advanced Message Queueing Protocol is a protocol standard developed by a consortium of organizations and developers including Rabbit Technologies (now VMWare/Spring), RedHat, JPMorgan Chase, Cisco and others.  While RabbitMQ supports multiple protocols including STOMP and HTTP, AMQP is its primary protocol and strongly defines RabbitMQ's functional behavior, even when using other protocols. It is because of the strong tie of RabbitMQ to AMQP that has led me to the conclusion that to fully understand RabbitMQ you first need to fully understand AMQP.

To that end, RabbitMQ has provided an excellent yet terse AMQP and API Quick Reference.  For the purposes of the information below I will be referencing AMQP version 0-9-1 and RabbitMQ 2.3.1 only.

Going down the Rabbit Hole

In this post I wanted to focus on how consumers can work with RabbitMQ to ensure that when messages are not processed correctly in client applications, they do not get dropped on the floor. To get there, let me first explain the different connotations of the term "message" when talking about RabbitMQ and AMQP.

The AMQP protocol specifies the semantics for sending different types of protocol level messages by categorizing the type of message by class and method in Class.Method format. Examples of this include Queue.Declare and Basic.Publish. If you're a Pika user you may know these as Channel.queue_declare() and Channel.basic_publish(). When referring to a message in context to something in Class.Method format, I am referring to the protocol message.

The class Basic encompasses most of the functionality related to sending and receiving application level messages. When we use Basic.Publish we are constructing a protocol level message with the application level message as a payload. It is the application level message that is received by the consumer application as a combination of a header and body. 

Instead of focusing on how different AMQP and RabbitMQ clients are used from a programmers perspective, I will attempt to primarily reference AMQP protocol level constructs with few references to client implementations.

Message Lifecycle

To illustrate the lifecycle of sending and receiving an application level message, the following steps are taken with a client application using the Basic.Consume method of receiving messages:
  1. Publishing application calls a library method for Basic.Publish with the routing information, the application level message and its properties as defined by the properties attribute of the Basic class. The client library constructs the Basic.Publish message and delivers it to the RabbitMQ server.

    Example of publishing a message with Pika:
    properties = BasicProperties(timestamp=time.time(),
                                 content_type="text/plain",
                                 delivery_mode=1)
    
    channel.basic_publish(exchange='example',
                          routing_key="example.test",
                          body=message,
                          properties=properties)
  2.  The server receives the Basic.Publish message and routesit into the appropriate queue as specified by the exchange and routing key.
  3.  Assuming that our client application is operating properly and RabbitMQ is ready to deliver it a message, when the message published in Step #1 is ready to be delivered**, RabbitMQ constructs a Basic.Deliver message with the payload of the Basic.Publish message it received in step #2.
  4.  The consuming application's RabbitMQ client library will receive the message, decode it and surface three distinct parts of the protocol message to the consuming application:
    1.  The Method frame defines the core information about the protocol message. This includes the channel number the message was delivered on and the delivery-tag for the message.  The delivery-tag in conjunction with the current connection and channel number uniquely identifies the message when communicating with RabbitMQ.
    2.  The Header frame defines the properties of the application message. In the case of our example from step #1, this includes the timestamp and content_type as well as properties about the body frame such as the content length.
    3.  The Body frame is the raw application level message data as assigned to the body parameter of channel.basic_publish() in step #1.
  5.  Assuming that, when the client application set itself up as a consumer, it did not specify the "noack" option the next step is for the client application to send a Basic.Ack message with the delivery-tag received in the header frame in step #4. This lets RabbitMQ know the client application has received the message and RabbitMQ may now delete the message from its stack.
    Example of sending a message acknowledgment with Pika:
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
 * This workflow is based upon the Basic.Consume method of consuming messages, the flow is slightly different for use of Basic.Get

** I am not deeply familiar with internal RabbitMQ application architecture and flow. Steps that illustrate what RabbitMQ is doing internally may accurately represent the internal RabbitMQ flow and are only meant to illustrate what is happening at a logical level of interfacing with RabbitMQ.

Dealing with Rejection

With step #5 we have concluded the lifespan of a properly consumed message and this covers a majority of the consuming behavior except when there is an exception in processing. In the event that a client application is unable to process a message and is able to let RabbitMQ know of the error, it should respond with either a Basic.Reject or Basic.Nack message.

Basic.Reject is specified in AMQP 0-9-1 with conflicting rules. Due to this conflict, RabbitMQ implements Basic.Reject with a behavior that does not match the exact behavior from the specification. The specification states that the client should not use Basic.Reject as a way to keep the broker from redelivering the message to the rejecting client and then in contradiction specifies that the broker should not redeliver the same message to a consumer who rejects it.  

If the requeue parameter is True when calling Basic.Reject RabbitMQ will treat the rejected message as a new message in the queue and may deliver it again to the consumer who rejected it. If the requeue parameter is False then RabbitMQ will delete the message from its stack. For more information on the Basic.Reject implementation in RabbitMQ, check out the RabbitMQ blog post on the subject.

Example rejecting a message with Pika:

channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)

Basic.Nack is a RabbitMQ only extension of the AMQP protocol and expands upon the Basic.Reject behavior in a few ways. The important distinction in this context is that Basic.Nack allows for multiple delivery-tags to be passed back upon receipt of the message. As a side note, Basic.Nack is also implemented in RabbitMQ as a mechanism for the server to reject a Basic.Publish message sent via from a publishing client. An example use case for server to client Basic.Nack is when the user_id property of a message sent with Basic.Publish does not match the authenticated user for the connection.

Example of sending Basic.Nack with Pika:

channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)

Since RabbitMQ currently does not have a built-in dead-letter system for when requeue=False when using either Basic.Reject or Basic.Nack, I have employed using a dead-letter queue for rejected messages. When using this approach, I send a Basic.Reject message with requeue=False and then a Basic.Publish with the message to the dead-letter queue.

Resurfacing messages in limbo

Rejecting messages is a great way to deal with messages you knowingly can not process in your client application, but what happens when a message makes your consumer unintentionally die? In any scenario where you can not issue a Basic.Ack, Basic.Nack or Basic.Reject to the RabbitMQ server and messages become stuck without redelivering, you can tell RabbitMQ to redeliver them.

Once your application is consuming, you send a Basic.Recover message to RabbitMQ and it will start delivering you messages that have yet to be acknowledged. If you specify the requeue flag to True, it will deliver to any consumer who is eligible for the message and if you specify it to False it will only try to redeliver it to the consumer who it originally sent it to, if it can.

Example of this when using Pika:

channel.basic_recover()
Taking this ability to the next level, your application may want to issue a passive Queue.Declare which will return the count of pending messages in the queue to your application. If you notice that you're not able to consume some messages and the pending message count is consistent, even when you're sitting mostly idle, you could issue a Basic.Recover message to try and unstick them.

I hope this was useful in understanding not only how to deal with failures in message processing, but also has given you a glimpse into the underlying AMQP protocol. Want to dive deeper? Check out RabbitMQ's 0-9-1 reference, which is clearer than the protocol spec. Did you not follow what I meant about a passive Queue.Declare? It's okay, I'd like to explain more. Let me know what parts of RabbitMQ and AMQP I should explain in more detail in a future blog post.

The Attention Deficit Disorder Guide to RabbitMQ

RabbitMQ has been one of my interests of late, as I’ve identified it as part of our technology path at work. There are other very good resources that dive pretty deep in RabbitMQ and how to use it. The goal of this guide is to help you get on your feet quickly and easily. It assumes a couple of things:

  • You already know about message queues and have some experience or knowledge on the subject.
  • You know what AMQP is.
  • You are already interested in RabbitMQ enough to try it out.

If you’re good on those things, let’s get started…

RabbitMQ is written in erlang. As such, you should have already downloaded and installed erlang as a first step.

Download RabbitMQ and install it, which is pretty easy.  I like to setup RabbitMQ in an /opt/rabbitmq directory. To do that, I set some environment variables before compiling (bash assumed):

export TARGET_DIR=/opt/rabbitmq
export SBIN_DIR=/opt/rabbitmq/sbin
export MAN_DIR=/opt/rabbitmq/man

Then I compile and install with “make install.” Because I like to run as my own user or a service user, I’ll chown -R myuser /opt/rabbitmq as appropriate.

There are a few other things we need to do including make the log directory and the directory RabbitMQ will use to store its data:

mkdir /var/log/rabbitmq
chown myuser /var/log/rabbitmq
mkdir /var/lib/rabbitmq
chown myuser /var/lib/rabbitmq

Now as “myuser” we can “cd /opt/rabbitmq/sbin” and run “./rabbitmq-server” and what you should see is:

RabbitMQ 1.6.0 (AMQP 8-0)
Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
Licensed under the MPL. See http://www.rabbitmq.com/

node  : rabbit@binti
log  : /var/log/rabbitmq/rabbit.log
sasl log  : /var/log/rabbitmq/rabbit-sasl.log
database dir: /var/lib/rabbitmq/mnesia/rabbit

starting database …done
starting core processes …done
starting recovery …done
starting persister …done
starting guid generator …done
starting builtin applications …done
starting TCP listeners …done

If you have the hang of starting RabbitMQ and now want to run it in the background, instead do: “./rabbitmq-server -detached”.

Once we’ve gotten this far, we’ve got our broker up and running and now we’ll need some way to talk to it. For the purposes of this article, I’m going to talk about amqplib and Python. There are AMQP libraries for just about every relevant language at this point. RabbitMQ 1.6.0 implements the AMQP 0.8 standard. The easiest way to install amqplib is a simple “easy_install amqplib”.

But before we dive into code, there are a few key concepts we need to talk about:

Queues: You should get these already, one puts a message in a queue and a consumer app receives it somewhere else.

Exchanges: These are a little more tricky than queues. I like to think of them as namespaces.  One of the keen things about RabbitMQ exchanges is that different exchanges will get a different erlang process which should help make better use your available hardware resources. There are three types of exchanges that we need to talk about:

Direct: a direct exchange means when you put a message in, it goes to one consumer and he’s all that will get that message routed through the exchange.

Fanout
: a fanout exchange sends your message to every consumer that listening to a particular exchange / queue combination.

Topic Exchange
: this type of exchange allows you to do neat things like listen to the same queue across exchanges on one consumer, multiple queues in one namespace in a consumer and other wildcard type trickery.

Bindings: In RabbitMQ you bind your exchanges and queues together in unique combinations which determine how messages are routed to what consumers.

Memory: As of RabbitMQ 1.6.0 all messages are kept in memory. If you have nothing consuming your messages and you send too many of them, you’ll run out of memory.

Monitoring: The main install has the app rabbitmq_ctl which you can use to inspect the various parts of RabbitMQ. This isn’t very good for remote monitoring or visualization. For that there’s a great project called Alice which is also erlang based.

Speed: There are two ways to get messages from RabbitMQ: basic_get and basic_consume.

basic_get is where your app, on a message by message basis, asks RabbitMQ for a message. This is the slower of the two methods and will not allow single consumer applications to scale to a very high transaction rate.  Note that RabbitMQ will not register these connections as a consumer and you will not see them in list_queues or in Alice as such.

basic_consume
is where your app registers itself with RabbitMQ as a consumer and RabbitMQ will send messages to you as fast as you’re able to consume them.

Durability: If you want to have the definitions of your queues and exchanges hang around if you have to restart RabbitMQ you need to define them as durable.

Auto-Delete: If you want your queues and exchanges to exist even when there are no consumers waiting for messages on them, you need to turn auto-delete off.

Persistence: If you do not tell RabbitMQ that you want it to hang on to your messages if it reboots, it will not do so. You must set the delivery mode of a message to “2” to tell it to persist it until it is consumed.

Auto-Ack: You can tell RabbitMQ to automatically acknowledge receipt of a message, or you can do it yourself. This is a boolean setting that you use when you’re consuming messages via basic_get or basic_consume.

Queue and Exchange definitions: By default, queues and exchanges do not exist until you connect a consumer to them. You can cheat and do this in your code that enqueues your messages.

Now that we have that out of the way, here’s some sample Consumer code:

#!/bin/env python
""" Sample Consumer Code """

import amqplib.client_0_8 as amqp
# This is the function that basic_consume will send messages to                               
def process_message( message ):
    """ Callback function used by channel.basic_consume """
    print 'Received: %s' % message.body

# Rabbit Server to connect to
host = '127.0.0.1'
port = 5672

# Exchange and queue information
exchange_name = 'test'
exchange_type = 'direct'
queue_name = 'messages'
routing_key = 'test.messages'

# Let's set this up by default, we'll use it later
process_messages = True

# Connect to Rabbit
connection= amqp.Connection( host ='%s:%s' % ( host, port ),
                        userid = 'guest',
                        password = 'guest',
                        ssl = False,
                        virtual_host = '/' )

# Create a channel to talk to Rabbit on
channel = connection.channel()

# Create our exchange
channel.exchange_declare( exchange = exchange_name, 
                          type = exchange_type, 
                          durable = True,
                          auto_delete = False )
                                       
# Create our Queue
channel.queue_declare( queue = queue_name , 
                       durable = True,
                       exclusive = False, 
                       auto_delete = True )
            
# Bind to the Queue / Exchange
channel.queue_bind( queue = queue_name, 
                    exchange = exchange_name,
                    routing_key = routing_key )

# Let AMQP know to send us messages
consumer_tag = channel.basic_consume( queue = queue_name, 
                                      no_ack = True,
                                      callback = process_message )

# Loop while process_messages is True
while process_messages:

    # Wait for a message
    channel.wait()            
            
# Close the channel
channel.close()

# Close our connection
connection.close()
            
# This might go somewhere like a signal handler
def cancel_processing():
    """ Stop consuming messages from RabbitMQ """
    global channel, consumer_tag, process_messages
    
    # Do this so we exit our main loop
    process_message = False          
    
    # Tell the channel you dont want to consume anymore  
    channel.basic_cancel( consumer_tag )

Note that a lot of what is in that example is commented code and whitespace for ease of reading, the actual implementation is pretty darn simple.

Now that we have a consumer going let’s send some messages in:

#!/bin/env python
import amqplib.client_0_8 as amqp

# Connect
connection = amqp.Connection( host = "localhost:5672", 
                              userid = "guest", 
                              password = "guest", 
                              virtual_host = "/", 
                              insist = False )

# Create our channel
channel = connection.channel()

""" We've already declared our queue, exchange and binding in our consumer so just send the messages """
for i in range(0, 10):
        message = amqp.Message("Test message %i!" % i)
        message.properties["delivery_mode"] = 2
        channel.basic_publish( message, 
                               exchange = "test", 
                               routing_key = "test.messages")

That’s it! If we did this right, you’ve now setup RabbitMQ, sent some messages and consumed them on the other end of the pipe.

If I’ve kept you this long and you’re still interested, but still have questions, I highly recommend this article which goes much more in depth and has been a valuable guide for me.

If you’re into both python and RabbitMQ, you might want to check out my consumer framework “rejected.py,” it’s on GitHub.

I hope you enjoyed the first of my A.D.D. Guides. I’d be happy to answer any questions and would appreciate feedback so I may improve this and future articles to come.