I've posted in the past about RabbitMQ and how to get started with it. I thought I'd follow up with something that's a little deeper, but still pretty entry level from a usage perspective. I've been spending a lot of time with RabbitMQ lately, as I'm now the active maintainer of
Pika, the python package for interfacing with it. Part of my diving deep into Pika has been becoming better versed in the AMQP standard.
AMQP or the Advanced Message Queueing Protocol is a protocol standard developed by a consortium of organizations and developers including Rabbit Technologies (now VMWare/Spring), RedHat, JPMorgan Chase, Cisco and others. While RabbitMQ supports multiple protocols including STOMP and HTTP, AMQP is its primary protocol and strongly defines RabbitMQ's functional behavior, even when using other protocols. It is because of the strong tie of RabbitMQ to AMQP that has led me to the conclusion that to fully understand RabbitMQ you first need to fully understand AMQP.
To that end, RabbitMQ has provided an excellent yet terse
AMQP and API Quick Reference. For the purposes of the information below I will be referencing AMQP version 0-9-1 and RabbitMQ 2.3.1 only.
Going down the Rabbit Hole
In this post I wanted to focus on how consumers can work with RabbitMQ to ensure that when messages are not processed correctly in client applications, they do not get dropped on the floor. To get there, let me first explain the different connotations of the term "message" when talking about RabbitMQ and AMQP.
The AMQP protocol specifies the semantics for sending different types of protocol level messages by categorizing the type of message by class and method in Class.Method format. Examples of this include Queue.Declare and Basic.Publish. If you're a Pika user you may know these as Channel.queue_declare() and Channel.basic_publish(). When referring to a message in context to something in Class.Method format, I am referring to the protocol message.
The class Basic encompasses most of the functionality related to sending and receiving application level messages. When we use Basic.Publish we are constructing a protocol level message with the application level message as a payload. It is the application level message that is received by the consumer application as a combination of a header and body.
Instead of focusing on how different AMQP and RabbitMQ clients are used from a programmers perspective, I will attempt to primarily reference AMQP protocol level constructs with few references to client implementations.
Message Lifecycle
To illustrate the lifecycle of sending and receiving an application level message, the following steps are taken with a client application using the Basic.Consume method of receiving messages:
- Publishing application calls a library method for Basic.Publish with the routing information, the application level message and its properties as defined by the properties attribute of the Basic class. The client library constructs the Basic.Publish message and delivers it to the RabbitMQ server.
Example of publishing a message with Pika:
properties = BasicProperties(timestamp=time.time(),
content_type="text/plain",
delivery_mode=1)
channel.basic_publish(exchange='example',
routing_key="example.test",
body=message,
properties=properties)
- The server receives the Basic.Publish message and routesit into the appropriate queue as specified by the exchange and routing key.
- Assuming that our client application is operating properly and RabbitMQ is ready to deliver it a message, when the message published in Step #1 is ready to be delivered**, RabbitMQ constructs a Basic.Deliver message with the payload of the Basic.Publish message it received in step #2.
- The consuming application's RabbitMQ client library will receive the message, decode it and surface three distinct parts of the protocol message to the consuming application:
- The Method frame defines the core information about the protocol message. This includes the channel number the message was delivered on and the delivery-tag for the message. The delivery-tag in conjunction with the current connection and channel number uniquely identifies the message when communicating with RabbitMQ.
- The Header frame defines the properties of the application message. In the case of our example from step #1, this includes the timestamp and content_type as well as properties about the body frame such as the content length.
- The Body frame is the raw application level message data as assigned to the body parameter of channel.basic_publish() in step #1.
- Assuming that, when the client application set itself up as a consumer, it did not specify the "noack" option the next step is for the client application to send a Basic.Ack message with the delivery-tag received in the header frame in step #4. This lets RabbitMQ know the client application has received the message and RabbitMQ may now delete the message from its stack.
Example of sending a message acknowledgment with Pika:
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
* This workflow is based upon the Basic.Consume method of consuming messages, the flow is slightly different for use of Basic.Get
** I am not deeply familiar with internal RabbitMQ application architecture and flow. Steps that illustrate what RabbitMQ is doing internally may accurately represent the internal RabbitMQ flow and are only meant to illustrate what is happening at a logical level of interfacing with RabbitMQ.
Dealing with Rejection
With step #5 we have concluded the lifespan of a properly consumed message and this covers a majority of the consuming behavior except when there is an exception in processing. In the event that a client application is unable to process a message and is able to let RabbitMQ know of the error, it should respond with either a Basic.Reject or Basic.Nack message.
Basic.Reject is specified in AMQP 0-9-1 with conflicting rules. Due to this conflict, RabbitMQ implements Basic.Reject with a behavior that does not match the exact behavior from the specification. The specification states that the client should not use Basic.Reject as a way to keep the broker from redelivering the message to the rejecting client and then in contradiction specifies that the broker should not redeliver the same message to a consumer who rejects it.
If the requeue parameter is True when calling
Basic.Reject RabbitMQ will treat the rejected message as a new message in the queue and may deliver it again to the consumer who rejected it. If the requeue parameter is False then RabbitMQ will delete the message from its stack. For more information on the
Basic.Reject implementation in RabbitMQ, check out the
RabbitMQ blog post on the subject.
Example rejecting a message with Pika:
channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)
Basic.Nack is a RabbitMQ only extension of the AMQP protocol and expands upon the Basic.Reject behavior in a few ways. The important distinction in this context is that Basic.Nack allows for multiple delivery-tags to be passed back upon receipt of the message. As a side note, Basic.Nack is also implemented in RabbitMQ as a mechanism for the server to reject a Basic.Publish message sent via from a publishing client. An example use case for server to client Basic.Nack is when the user_id property of a message sent with Basic.Publish does not match the authenticated user for the connection.
Example of sending Basic.Nack with Pika:
channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)
Since RabbitMQ currently does not have a built-in dead-letter system for when requeue=False when using either Basic.Reject or Basic.Nack, I have employed using a dead-letter queue for rejected messages. When using this approach, I send a Basic.Reject message with requeue=False and then a Basic.Publish with the message to the dead-letter queue.
Resurfacing messages in limbo
Rejecting messages is a great way to deal with messages you knowingly can not process in your client application, but what happens when a message makes your consumer unintentionally die? In any scenario where you can not issue a Basic.Ack, Basic.Nack or Basic.Reject to the RabbitMQ server and messages become stuck without redelivering, you can tell RabbitMQ to redeliver them.
Once your application is consuming, you send a Basic.Recover message to RabbitMQ and it will start delivering you messages that have yet to be acknowledged. If you specify the requeue flag to True, it will deliver to any consumer who is eligible for the message and if you specify it to False it will only try to redeliver it to the consumer who it originally sent it to, if it can.
Example of this when using Pika:
Taking this ability to the next level, your application may want to issue a passive Queue.Declare which will return the count of pending messages in the queue to your application. If you notice that you're not able to consume some messages and the pending message count is consistent, even when you're sitting mostly idle, you could issue a Basic.Recover message to try and unstick them.
I hope this was useful in understanding not only how to deal with failures in message processing, but also has given you a glimpse into the underlying AMQP protocol. Want to dive deeper? Check out
RabbitMQ's 0-9-1 reference, which is clearer than
the protocol spec. Did you not follow what I meant about a passive
Queue.Declare? It's okay, I'd like to explain more. Let me know what parts of RabbitMQ and AMQP I should explain in more detail in a future blog post.