Posterous theme by Cory Watilo

redis_git_dirty?

Have you been wondering what the redis_git_dirty stat is in the Redis info command is?

Me too, so I downloaded the source and started grepping. What I found was that it is a constant that is defined in the src/mkreleasehdr.sh file. The exact command run to gather the value is:

GIT_DIRTY=`git diff 2> /dev/null | wc -l`

What is it doing? It's counting the number of lines when running the git diff command against the redis source code.

The purpose? It looks like it's a development flag to indicate how different the compiled version of redis is from what the last git commit was.

Should you monitor it? Nope, it's a waste of bits. In theory, this should always be 0 in a production release.

clihelper

I tend to write a fair amount of command-line applications in Python that more often than not are meant to run as daemons. I also tend to use the same patterns in doing so. At first I wrote daemonization code myself following the general pattern that can be found in many places. Then I discovered python-daemon, the reference implementation of PEP-3143.

For logging, I would often use the same code, cut-and-paste from one application to another. After digging into the logging documentation for Python 2.7, I decided that DictConfig in logging was for me, but I needed support in 2.6. I also wanted something I could install via pip instead of copying the code from 2.7's logging package and including it in my code. Thus logging-config was born. *Edit: I have now removed logging-config and moved to logutils thanks to a comment by Vinay Sajip below.

Today, I am releasing clihelper, a Python module that aims to make writing command-line applications and daemons in Python easier. It uses python-daemon and logging/logutils together with a YAML based configuration file to let one focus on writing the core application and not the details about how to deal with command-line option handling, configuration, logging and daemonization.

Getting started with clihelper is meant to be very straightforward; simply extend the clihelper.Controller class:

class MyApp(clihelper.Controller):
    def _process(self):
        self._logger.info('Would be processing at the specified interval now')

Next, in the main guard for the python module you are putting the MyApp class clihelper.setup method should be called, then call the clihelper.run method passing in the class that will be used as your application controller.

if __name__ == '__main__':
    clihelper.setup('MyApp', 'MyApp is just a demo', '0.0.1')
    clihelper.run(MyApp)

Next, the configuration file should be created:

Logging:
  version: 1
  formatters:
    verbose:
      format: '%(levelname) -10s %(asctime)s %(process)-6d %(processName) -15s %(name) -10s %(funcName) -20s: %(message)s'
      datefmt: '%Y-%m-%d %H:%M:%S'
  handlers:
    console:
      class: logging.StreamHandler
      formatter: verbose
      level: DEBUG
      debug_only: True
    file:
      class: logging.handlers.RotatingFileHandler
      formatter: verbose
      level: DEBUG
      filename: example.log
      maxBytes: 1024
      backupCount: 3
  loggers:
    MyApp:
      handlers: [console, file]
      level: DEBUG
      propagate: true
    clihelper:
      handlers: [console, file]
      level: DEBUG
      propagate: true

Now invoke your application via the command line. Try passing --help to see the base level options:

Usage: usage: example.py -c  [options]

MyApp is just a demo

Options:
  -h, --help            show this help message and exit
  -c CONFIGURATION, --config=CONFIGURATION
                        Path to the configuration file.
  -f, --foreground      Run interactively in console

clihelper allows you to add your own command line options and does not need to be interval based. Instead, one can use a blocking IOLoop or other similar concepts. In the class that extends clihelper.Controller, redefine the clihelper.Controller._loop method. In addtion, you'll likely want to extend the clihelper.Controller._shutdown method to tell the IOLoop to stop when the application has been signalled to stop. An example with the Tornado IO Loop may look something like:

import clihelper
from tornado import ioloop

class Test(clihelper.Controller): 
    
    def _loop(self):
        self._ioloop = ioloop.IOLoop.instance()
        self._ioloop.start()

    def _shutdown(self):
        self._ioloop.stop()

There are a few other options, so if you're inclined to try it out, I suggest reading the README and the code. I hope that someone else will find this useful. If you have any suggestions or improvements, please do not hesitate to send them my way.

Disabling default iTunes behavior with media keys in OSX

In a search for a non-hacky way to disable direct mappings to iTunes for the play/pause/next/previous buttons in the keyboard in OSX, I stumbled across the "Remote control daemon" (rcd). As it turns out, disabling the iTunes launch behavior for these keys is as easy as unloading rcd with the following command:

launchctl unload -w /System/Library/LaunchAgents/com.apple.rcd.plist

I've not run across any negative impact of doing so and now when I use Enqueue, the buttons work for just it.

Homebrew and RabbitMQ Plugins

Since there is no plugin option in Homebrew's install of RabbitMQ one might have a hard time finding how to install RabbitMQ plugins. Assuming you used the default location of /usr/local for your homebrew setup, you only need to copy the .ez files for the plugins to the following directory:

/usr/local/Cellar/rabbitmq/2.4.1/lib/rabbitmq/erlang/lib/rabbitmq-2.4.1/plugins

In my case, I downloaded the plugins to my Downloads folder (~/Downloads) and issued the following command:

mv ~/Downloads/*ez /usr/local/Cellar/rabbitmq/2.4.1/lib/rabbitmq/erlang/lib/rabbitmq-2.4.1/plugins

Once you’ve moved the files there, you’ll need to restart the RabbitMQ server to have them loaded. If you used the launchctl instructions during the RabbitMQ brew install for starting and adding the service, you should be able to issue two commands in terminal:

launchctl stop com.rabbitmq.rabbitmq-server
launchctl start com.rabbitmq.rabbitmq-server

That’s all there is to it. If you installed the Management plugin, you can now test by going to http://localhost:55672.

The Future of Pika

I've been listening to a lot of the conversations at PyCon about asynchronous development. The basic sentiment I've picked up on is that Callback Passing Style (CPS) is not currently in favor in the Python community.  This in addition to the popular use of the BlockingConnection in Pika has lead me to think about how to plan Pika's future enhancements. After some conversation with Tony, I believe I have an outline that should appeal to Python developers while keeping Pika asynchronous at its core and retaining CPS. I think that CPS is very powerful and believe it's still very important to Pika's future.

After I release 0.9.5, I will start development on Pika 2.0 which will be an effort to create a more pythonic approach to using Pika while retaining the ability to use CSP and keeping it asynchronous at its core.

The roadmap for changes to Pika 2.0:

- Backwards incompatible change that drops Python 2.4, 2.5 support
- Add Python 3 support
- Remove existing connection adapter system
- Implement new pattern for use, behavior based use focused on both Asynchronous callback passing style and "Pythonic" development.
  - Both behaviors available from the same API calling same classes and methods
  - Async:
    - Merge existing connections into one connection system with IOLoop override
      - Supporting internal IOLoop, tornado, twisted
  - Pythonic:
    - high-level blocking on synchronous AMQP commands
    - Generator for receiving messages from Basic.Publish
- API notation closer to AMQP spec for implementation.
- *.*Ok frames will only be passed back in CPS use.
  - Calling methods like queue.declare will return a success indicator and attributes returned in the Ok frame will be assigned to attributes of the class.
- basic.consume and basic.get will return a single object with a materialized view of the Method, Header and Body frames.
- Build in support for specific broker types and pure AMQP 0-9-1.

Here's an example of what I expect Pika 2.0 to look like for non-CPS use. Note this is more of an idea of how it will work for someone using Pika than a spec or actual code. 
from pika.rabbitmq import Connection
from pika import Basic
from pika import Channel
from pika import Exchange
from pika import Queue

from sys import exit

# All the attributes can be passed in via constructor or assigned
connection = Connection()
connection.host = 'localhost'
connection.port = 5762
connection.user = 'guest'
connection.pass = 'guest'
connection.vhost = '/'

# Not much new here
try:
    connection.open()
except pika.ConnectException as e:
    print "Could not connect: %s" % e
    sys.exit(0)

# Channel construction outside of connection context, instead pass
# the Connection in
channel = Channel()
try:
    channel.open(connection)
except pika.TimeoutException as e:
    print "Could not open a channel: %s" % e
except pika.ConnectionClosedException as e:
    print "Could not open a channel, the connection is closed" 

# All the attributes can be passed in via constructor or assigned
exchange = Exchange(channel)
exchange.name = 'not_microsoft'
exchange.type = 'fanout'
exchange.durable = True
exchange.declare()

# All the attributes can be passed in via constructor or assigned
queue = Queue(channel)
queue.name = 'my_queue'
queue.auto_delete = False
queue.durable = True
queue.passive = False

# Declare the queue and expect a bool
if not queue.declare():
    raise Exception("Could not declare my queue")

# Print info about the queue that was mapped automatically when
# Queue.DeclareOk was received 
print 'Queue "%s"' % queue.name
print ' Depth     : ' % queue.message_count
print ' Consumers : %i' % queue.consumer_count  

# Bind the queue
queue.bind(exchange=exchange, routing_key='not_microsoft.my_queue')

# Generator returning one type for a message
for message in Basic.consume(my_channel, routing_key="myqueue"):
    print 'Delivery Tag   : %s' % message.delivery_tag
    print 'Channel        : %i' % message.channel
    print 'Body Size      : %i' % len(message.body)
    print 'Properties'
    print '  Content-Type : %s' % message.properties.content_type
    print '  Timestamp    : %s' % message.properties.timestamp
    print '  User Id      : %s' % message.properties.user_id
    print '  App Id       : %s' % message.properties.app_id
    print 'Body           : %s' % message.body
I am looking for feedback on this direction. Do these changes and the example make sense to existing Pika and RabbitMQ uses?  Would you change anything about this direction? What would you improve?

PyCon by Network Adapter Manufacturer

On Friday, Doug Hellmann of PyMOTW fame commented on Twitter that he'd be interested in the breakdown of laptop vendors for people attending PyCon, something I had been pondering that morning as well.

In a attempt to come up with an approximation of this number, this morning I kicked off Kismet to listen on the network for approximately 10 minutes while it collected 2,371 MAC addresses. Taking the data it collected, I wrote a script that looked up the MAC addresses it found to look the manufacturer. I was surprised at inconsistency of the company names in the MAC address vendor database. I cleaned up the multiple versions of manufacturer strings and ended up with data I could then easily chart.

The results confirmed the impression one gets just walking around and looking at what people are using:
Screen_shot_2011-03-12_at_12

A pretty strong showing of Apple devices. It appears that the lower end numbers are either mobile devices on the network or network gear. If you're interested in the count data, here's a text file with the breakdown.

Deeper down the rabbit hole of message (re)delivery

I've posted in the past about RabbitMQ and how to get started with it.  I thought I'd follow up with something that's a little deeper, but still pretty entry level from a usage perspective. I've been spending a lot of time with RabbitMQ lately, as I'm now the active maintainer of Pika, the python package for interfacing with it. Part of my diving deep into Pika has been becoming better versed in the AMQP standard.

AMQP or the Advanced Message Queueing Protocol is a protocol standard developed by a consortium of organizations and developers including Rabbit Technologies (now VMWare/Spring), RedHat, JPMorgan Chase, Cisco and others.  While RabbitMQ supports multiple protocols including STOMP and HTTP, AMQP is its primary protocol and strongly defines RabbitMQ's functional behavior, even when using other protocols. It is because of the strong tie of RabbitMQ to AMQP that has led me to the conclusion that to fully understand RabbitMQ you first need to fully understand AMQP.

To that end, RabbitMQ has provided an excellent yet terse AMQP and API Quick Reference.  For the purposes of the information below I will be referencing AMQP version 0-9-1 and RabbitMQ 2.3.1 only.

Going down the Rabbit Hole

In this post I wanted to focus on how consumers can work with RabbitMQ to ensure that when messages are not processed correctly in client applications, they do not get dropped on the floor. To get there, let me first explain the different connotations of the term "message" when talking about RabbitMQ and AMQP.

The AMQP protocol specifies the semantics for sending different types of protocol level messages by categorizing the type of message by class and method in Class.Method format. Examples of this include Queue.Declare and Basic.Publish. If you're a Pika user you may know these as Channel.queue_declare() and Channel.basic_publish(). When referring to a message in context to something in Class.Method format, I am referring to the protocol message.

The class Basic encompasses most of the functionality related to sending and receiving application level messages. When we use Basic.Publish we are constructing a protocol level message with the application level message as a payload. It is the application level message that is received by the consumer application as a combination of a header and body. 

Instead of focusing on how different AMQP and RabbitMQ clients are used from a programmers perspective, I will attempt to primarily reference AMQP protocol level constructs with few references to client implementations.

Message Lifecycle

To illustrate the lifecycle of sending and receiving an application level message, the following steps are taken with a client application using the Basic.Consume method of receiving messages:
  1. Publishing application calls a library method for Basic.Publish with the routing information, the application level message and its properties as defined by the properties attribute of the Basic class. The client library constructs the Basic.Publish message and delivers it to the RabbitMQ server.

    Example of publishing a message with Pika:
    properties = BasicProperties(timestamp=time.time(),
                                 content_type="text/plain",
                                 delivery_mode=1)
    
    channel.basic_publish(exchange='example',
                          routing_key="example.test",
                          body=message,
                          properties=properties)
  2.  The server receives the Basic.Publish message and routesit into the appropriate queue as specified by the exchange and routing key.
  3.  Assuming that our client application is operating properly and RabbitMQ is ready to deliver it a message, when the message published in Step #1 is ready to be delivered**, RabbitMQ constructs a Basic.Deliver message with the payload of the Basic.Publish message it received in step #2.
  4.  The consuming application's RabbitMQ client library will receive the message, decode it and surface three distinct parts of the protocol message to the consuming application:
    1.  The Method frame defines the core information about the protocol message. This includes the channel number the message was delivered on and the delivery-tag for the message.  The delivery-tag in conjunction with the current connection and channel number uniquely identifies the message when communicating with RabbitMQ.
    2.  The Header frame defines the properties of the application message. In the case of our example from step #1, this includes the timestamp and content_type as well as properties about the body frame such as the content length.
    3.  The Body frame is the raw application level message data as assigned to the body parameter of channel.basic_publish() in step #1.
  5.  Assuming that, when the client application set itself up as a consumer, it did not specify the "noack" option the next step is for the client application to send a Basic.Ack message with the delivery-tag received in the header frame in step #4. This lets RabbitMQ know the client application has received the message and RabbitMQ may now delete the message from its stack.
    Example of sending a message acknowledgment with Pika:
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
 * This workflow is based upon the Basic.Consume method of consuming messages, the flow is slightly different for use of Basic.Get

** I am not deeply familiar with internal RabbitMQ application architecture and flow. Steps that illustrate what RabbitMQ is doing internally may accurately represent the internal RabbitMQ flow and are only meant to illustrate what is happening at a logical level of interfacing with RabbitMQ.

Dealing with Rejection

With step #5 we have concluded the lifespan of a properly consumed message and this covers a majority of the consuming behavior except when there is an exception in processing. In the event that a client application is unable to process a message and is able to let RabbitMQ know of the error, it should respond with either a Basic.Reject or Basic.Nack message.

Basic.Reject is specified in AMQP 0-9-1 with conflicting rules. Due to this conflict, RabbitMQ implements Basic.Reject with a behavior that does not match the exact behavior from the specification. The specification states that the client should not use Basic.Reject as a way to keep the broker from redelivering the message to the rejecting client and then in contradiction specifies that the broker should not redeliver the same message to a consumer who rejects it.  

If the requeue parameter is True when calling Basic.Reject RabbitMQ will treat the rejected message as a new message in the queue and may deliver it again to the consumer who rejected it. If the requeue parameter is False then RabbitMQ will delete the message from its stack. For more information on the Basic.Reject implementation in RabbitMQ, check out the RabbitMQ blog post on the subject.

Example rejecting a message with Pika:

channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)

Basic.Nack is a RabbitMQ only extension of the AMQP protocol and expands upon the Basic.Reject behavior in a few ways. The important distinction in this context is that Basic.Nack allows for multiple delivery-tags to be passed back upon receipt of the message. As a side note, Basic.Nack is also implemented in RabbitMQ as a mechanism for the server to reject a Basic.Publish message sent via from a publishing client. An example use case for server to client Basic.Nack is when the user_id property of a message sent with Basic.Publish does not match the authenticated user for the connection.

Example of sending Basic.Nack with Pika:

channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)

Since RabbitMQ currently does not have a built-in dead-letter system for when requeue=False when using either Basic.Reject or Basic.Nack, I have employed using a dead-letter queue for rejected messages. When using this approach, I send a Basic.Reject message with requeue=False and then a Basic.Publish with the message to the dead-letter queue.

Resurfacing messages in limbo

Rejecting messages is a great way to deal with messages you knowingly can not process in your client application, but what happens when a message makes your consumer unintentionally die? In any scenario where you can not issue a Basic.Ack, Basic.Nack or Basic.Reject to the RabbitMQ server and messages become stuck without redelivering, you can tell RabbitMQ to redeliver them.

Once your application is consuming, you send a Basic.Recover message to RabbitMQ and it will start delivering you messages that have yet to be acknowledged. If you specify the requeue flag to True, it will deliver to any consumer who is eligible for the message and if you specify it to False it will only try to redeliver it to the consumer who it originally sent it to, if it can.

Example of this when using Pika:

channel.basic_recover()
Taking this ability to the next level, your application may want to issue a passive Queue.Declare which will return the count of pending messages in the queue to your application. If you notice that you're not able to consume some messages and the pending message count is consistent, even when you're sitting mostly idle, you could issue a Basic.Recover message to try and unstick them.

I hope this was useful in understanding not only how to deal with failures in message processing, but also has given you a glimpse into the underlying AMQP protocol. Want to dive deeper? Check out RabbitMQ's 0-9-1 reference, which is clearer than the protocol spec. Did you not follow what I meant about a passive Queue.Declare? It's okay, I'd like to explain more. Let me know what parts of RabbitMQ and AMQP I should explain in more detail in a future blog post.

Announcing Vorpal Bunny 0.1

Vorpal Bunny is an experimental rabbitmq-jsonrpc-channel PHP driver designed to allow expedited single call HTTP delivery for Basic.Publish calls to RabbitMQ.

The goal is to be a light-weight tool for higher throughput with smaller protocol overhead for single-call of Basic.Publish per application execution. 

Our use case is focused on PHP web applications that span hundreds of servers with hundreds of backends each. With the existing AMQP drivers for PHP, applications can not currently maintain consistent connections and publish across application executions, instead they connect, publish and disconnect.  Vorpal Bunny is intended to reduce the overhead on the RabbitMQ server and the PHP application by using the jsonrpc-channel plugin, which maintains its own channels per session inside RabbitMQ's running instance,  eliminating most of the AMQP protocol overhead for one off message delivery.

This is experimental for us and has not entered production, however I felt it may be useful for those out there who are exploring the rabbitmq-jsonrpc-channel plugin as well.

Example use:

$vb = new VorpalBunny( 'localhost' );
$vb->publish( "", "test", "Hello World!" );

The design tries to employ PHP's APC cache if available to keep one session with the RPC server active per Apache server on the RabbitMQ server. Application flow is as follows:
  1. Construct object, determining of APC cache is available
  2. On first call of publish, establish a session with the rabbitmq-jsonrpc-plugin, then send the message
  3. On subsequent calls, call publish will send without trying to establish a session and then look for an error indicating the session has timed out or is no longer available. If this is the case, re-establish the session and retry the delivery.
Vorpal Bunny is available on GitHub at https://github.com/myYearbook/VorpalBunny and can also download 0.1 directly at https://github.com/myYearbook/VorpalBunny/zipball/0.1

Automagical Docstring Linking in Sphinx

I've been trying to pick up Sphinx to document my projects and have spent a few hours today trying to force it to do what I want. In essence I was looking for a way using the autodoc functionality to automatically replace module.Class references in docblock text with python domain markup.

To some extent, I would have thought this functionality would be included and if it is, I could not find it. What I did find, however, was the autodoc-process-docstring event that I was able to create a handler for. In this handler, I would end up using a regex to search for module.Class references and then replace them in place with :py:meth:~`module.Class` which then auto-links the appropriate document and location in the docblock's Sphinx output for me.

Since I could not find any examples of this, nor any mention or example of using autodoc-process-docstring, I present to you my implementation. To use, just append this to your conf.py in your Sphinx project.

import re
py_meth = re.compile("(([a-z_]+)([.][a-z_]+)+?)", re.IGNORECASE)

def is_module(test_string):
     try:
        __import__(test_string)
        return True
    except ImportError:
        parts = test_string.split('.')
        try:
            __import__('.'.join(parts[:-1]))
            return True
        except ImportError:
            return False
    return False

def process_docstring(app, what, name, obj, options, lines):
     for x in xrange(0, len(lines)):
        matches = py_meth.findall(lines[x])
        for match in matches:
             if is_module(match[0]):
                lines[x] = lines[x].replace(match[0], ':py:meth:`~%s`' %\
                                                      match[0])

 def setup(app):
    app.connect('autodoc-process-docstring', process_docstring)
I'm checking to see if my matches are actually importable and if they are, I'm replacing with the :py:meth syntax. Since my python project is local to the scope of the sphinx project, the import testing is valid.