Blog Post

Python SDK and Twisted

Mark Nunberg of Couchbase Published

I'm working on a Twisted interface to the Couchbase client (https://github.com/couchbase/couchbase-python-client). The link there points to the synchronous interface. The experimental twisted branch is at https://github.com/couchbaselabs/couchbase-twisted-client

To explain how the Twisted client works, I'll explain a bit about how the C extension works internally.

Basic SDK Internals

libcouchbase: The cross-platform library

The synchronous client itself is a C extension that utilizes libcouchbase (http://github.com/couchbase/libcouchbase). libcouchbase acts as the common platform layer for a number of other Couchbase clients written in a variety of other languages (like Ruby, PHP, node.js, Perl, and several others).

 

The Python extension is a C extension using Python's C API (why ffi, Cython, etc. are not used is another matter for discussion, but I'll just say I've been very happy with Python's C API and the general stability and maintenance of the extension thus far).

 

The way the extension works is fairly involved. The exposed API shown to Python offers a variety of data access methods (e.g. set, get, add, append, delete; as well as variants which can operate on multiple values, like set_multi, get_multi etc.).  These methods call directly into the Python extension which does some argument validation and then schedules these operations into libcouchbase (using e.g. lcb_set(), lcb_get(), etc). Once these operations are scheduled, the extension then calls lcb_wait() which blocks until the results for the scheduled operations have been completed (or have failed with an error).

The general code looks like this:

lcb_get_cmd_t command;
lcb_get_cmd_t *cmdp = &command;
command.v.v0.key = key; /* where 'key' is extracted from some PyObject */
command.v.v0.nkey = nkey; /* length of the key */
command.v.v0.exptime = exptime; /* optional, extracted from parameter */

errstatus = lcb_get(instance, /* library handle */,
                    cookie /* opaque pointer */,
                    &cmdp, /* command "list" */
                    1 /* number of items in list */);

/** check 'errstatus' */
errstatus = lcb_wait(instance); /* block */

 

lcb_wait runs an internal event loop implemented by libcouchbase, and eventually calls the result callbacks:

The actual results are not delivered from libcouchbase as a return value, but rather passed as a callback. The idea behind using callbacks is that the result structure contains pointers to network buffers. If the results were actually 'returned' by the library, it would require the library copying the temporary network buffers to allocated memory and requiring the user to free them which would impact performance. The result callback looks something like this:

void get_callback(lcb_t instance, const void *cookie, lcb_error_t status, const lcb_get_resp_t *resp)

{
    char *key = resp->v.v0.key;
    char *value = resp->v.v0.bytes;
    printf("Got result for %.*s: %.*s\n", resp->v.v0.nkey, key, resp->v.v0.nbytes, value);
    printf("Flags: %lu, CAS %llu\n", resp->v.v0.flags, resp->v.v0.cas);
}

(Const-correctness and casting omitted for the sake of clarity).

The user of the C libcouchbase library must install callbacks to be invoked for specific operations. These callbacks receive the normal response information (i.e. response, error status, etc.) as well as a per-operation opaque pointer (the 'cookie') which is passed by the user (in this case the C extension). This pointer is used-controlled and managed, and generally holds context/application specific data which the user can use to associate a response with a given request.

Interacting with libcouchbase and CPython

From the perspective of the public API exposed by the extension, each operation results in a 'Result' instance (or a suitable subclass thereof) being returned to the user. The 'Result' object contains information about the result of the operation and any associated metadata. In interacting with the C library, the Result object is an extension class which inherits from PyObject and is allocated before each request to the C library. It is then passed as the 'cookie' argument to the scheduled operation. Once the C library invokes the installed callback, the callback populates the Result object with the relevant information. Finally when the event loop exits the Result object is then to the user.

A simplistic implementation would look like this:

static PyObject *
Connection_get(pycbc_Connection *self, PyObject *args)
{
    const char *key;
    if (!PyArg_ParseTuple("s", &key, args) {
         return NULL;
    }

    lcb_get_cmd_t command, *commandp = &command;
    command->v.v0.key = key;
    command->v.v0.nkey = strlen(key);
    pycbc_ValueResult *vresult = PyObject_CallObject(&pycbc_ValueResultType, NULL);
    lcb_get(self->instance, vresult, &commandp, 1);
    lcb_wait(self->instance);
    return vresult;
}

 

lcb_wait() will block until the result has arrived and its callback invoked. The callback would look something like this:

void get_callback(lcb_t instance, const void *cookie, lcb_error_t err, const lcb_get_resp_t *resp)
{

    pycbc_ValueResult *result = (pycbc_ValueResult *)cookie;
    result->value = PyString_FromStringAndSize(resp->v.v0.bytes, resp->v.v0.nbytes);
    result->rc = err;
    result->flags = resp->v.v0.flags;
    result->cas = resp->v.v0.cas;
}

[ Note that because the SDK supports things like Transcoders and different value formats, the actual code converting to and from keys and values is significantly more complex than this, but isn't relevant to the actual architecture being discussed here ].

To be more specific, the Result object is only allocated from within the callback, and what is actually allocated before the wait sequence is a MultiResult object which is a dict subclass. This MultiResult object extends dict by adding some specific internal parameters; for each callback invoked during the wait sequence, a new Result object is allocated, and inserted into this MultiResult dict with the key being the key of the data item being accessed. Thus the code actually looks something like this:

/* .... */
pycbc_MultiResult *mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
lcb_get(self->instance, mres, &commandp, 1);
/* ... */

and in the callback

/* ... */
pycbc_MultiResult *mres = (pycbc_MultiResult *)cookie;
pycbc_ValueResult *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);

/* .. assign members of vres */

PyObject *keyobj = PyString_FromStringAndSize(resp->v.v0.key, resp->v.v0.nkey);
vres->key = keyobj;
PyDict_SetItem(&mres->dict, key, vres);
Py_DECREF(vres);
/* ... */

This internal design, while complex, allows for very efficient code re-use (and thus testing and debugging) and performs rather nicely. This abstraction is also implemented entirely in pure C - making the overhead minimal.

Implementing Async I/O

Event Loop Integration

The aforementioned C library offers an extensible IO plugin interface. Earlier I mentioned that the C library uses its own internal event loop, but that's not entirely true; rather what the library does is expose an I/O plugin API that allows one to implement their own functions for scheduling common asynchronous primitives such as socket watchers and timeout events. Because the library itself is entirely asynchronous, it allows integration with non-blocking program flow.

This API may be seen here: https://github.com/couchbase/libcouchbase/blob/62300167f6f7d0f84ee6ac2162591805dfbf163d/include/libcouchbase/types.h#L196-221

Thus in order to integrate with Twisted's reactor event loop, I had to write an 'IO plugin' which would implement the common async primitives using Twisted's reactor methods; the result thereof can be seen here, and on its own is, I think, fairly straight forward: https://github.com/couchbaselabs/couchbase-twisted-client/blob/twisted/txcouchbase/iops.py

[Note that the 'is_sync' property is just there to test basic functionality in a synchronous pattern using Twisted's reactor as the event loop backend. This is not the default].

In order to expose this IO Plugin interface to Python, I made Python wrapper classes for these primitives; so we have IOEvent, TimerEvent, etc. The basic idea is that these objects contain internal pointers to C callback data. Additionally, there is the 'IOPS' class which manages one or more of these objects. The basic idea is that the C library calls (via the extension) into the 'IOPS' object passing it one of the Event objects; requesting some kind of scheduling modification (i.e. watch, unwatch, destroy, etc.). The IOPS object then calls into the actual event loop implementation (in this case, reactor) to schedule the desired event; passing it the relevant Event object. When the event is ready, one of the Event's 'ready_*' methods is called, which calls into C. Understandably, this all causes somewhat of a performance hit - but with the benefit that the code is now able to interact asynchronously in any Python event loop.

Putting it all together, it looks something like this:

When libcouchbase first creates a socket, it associates it with some kind of IO Event object, which is exposed as a pointer:

void *event = instance->iops.create_event(instance->iops);

 

This calls into our Python code which looks like this:

static void *
create_event(lcb_io_opt_t *iobase)

{
    pycbc_iops_t = (pycbc_iops_t *)iobase;
    PyObject *event_factory = PyObject_GetAttrString(io->py_impl, "create_event");
    return PyObject_CallObject(event_factory, NULL);
}

This 'event_factory' must return a subclass of IOEvent, with some added fields; it can look something

like this:

from couchbase._libcouchbase import IOEvent

class MyIOEvent(IOEvent):
    def doRead(self):
       self.ready_r()

    def doWrite(self):
       self.ready_w()


class IOPS(object):
    def create_event(self):
       return MyIOEvent()

Now, when the library wants to receive notifications when a given socket is available, it does something like this:

instance->iops->update_event(instance->iops, sockfd, LCB_READ_EVENT, some_callback, some_pointer);

Which then calls this:

static void
update_event(lcb_io_opt_t iobase, void *event, int sockfd, short flags, void (*callback)(int, short, void *), void *data)
{

    pycbc_io_opt_t *io = (pycbc_io_opt_t *)iobase;
    pycbc_IOEvent *ev = (pycbc_IOEvent *)event;

    event->fd = sockfd; /* storage for '.fileno()' */
    event->callback_info.callback = callback;
    event->callback_info.data = data;

    PyObject *args = Py_BuildValue("(O,I)", ev, flags);
    PyObject *meth = PyObject_GetAttrString(io->py_impl, "update_event");
    PyObject_CallObject(meth, args);
}

Which in Python might look like:

def update_event(self, event, flags):
    if flags & READ_EVENT:
       self.reactor.addReader(event)

The 'event' parameter is an object returned by our previous 'create_event' method, which returns an instance of MyIOEvent, which contains the necessary implementation of doRead.

At some point later in the future, the reactor detects the underlying fileno() being available for reading, and calls the 'doRead()' method - shown above. In our implementation, doRead calls 'ready_r()', which is a method implemented in C:

static void *
Event_ready_r(pycbc_IOEvent *event)
{
    event->callback_info.callback(event->fd, READ_EVENT, event->callback_info.data);
}

Future/Deferred Connection API

In order to make the actual data access API asynchronous, I added some private parameters on the Connection object which marks it as 'asynchronous' -- basically it sets a field inside the Connection object and allocates a suitable 'IOPS' instance. Once each operation has been schedule, the extension will check of the Connection object's 'F_ASYNC' flag is set. If it is, it will not call lcb_wait(), but return an AsyncResult object (which is a subclass of MultiResult), rather than awaiting for the result. This 'AsyncResult' object contains an 'errback' and 'callback' property which are invoked when the result is ready.

Likewise, in the callback code, if the Connection 'F_ASYNC' flag is set, each time a result is received, it will invoke the relevant AsyncResult.callback or AsyncResult.errback function (depending on success or failure).

The great thing about the internal structure is that very little modification was needed to allow async operation; and therefore all the stability of the sync API is added to the new async interface.

Our simple 'get' implementation now looks like this in C:

static PyObject *

get(pycbc_Connection *self, PyObject *args)

{

  /* Do argument validation as normal */
  ...

  pycbc_MultiResult *mres;

  if (self->flags & F_ASYNC) {
    mres = PyObject_CallObject(&pycbc_AsyncResultType, NULL);
  } else {
    mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
  }

  /** Validate arguments, etc. */
  ...

  err = lcb_get(self->instance, mres, &commandp, 1);

  if (self->flags & F_ASYNC) {
     lcb_wait(self->instance);
  }

  return mres;
}

And likewise in the callback;

static void
get_callback(lcb_t instance, const void *cookie, lcb_error_t err, const lcb_get_resp_t *resp)

{
  pycbc_MultiResult *mres = cookie;
  pycbc_ValueResult *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);

  /** Setup the value result, and insert into the dictionary ... */
  ...

  if (mres->parent->flags & F_ASYNC) {
    /* it's an AsyncResult, which is a subclass */
    PyObject_CallObject( ((pycbc_AsyncResult *)mres)->callback, NULL);
    Py_DECREF(mres); /* we're not returning it */
  }
}

A new 'txcouchbase' package was added, which contains its own Connection class. This Connection class during construction sets these internal flags. Additionally, for each operation, the returned AsyncResult object is wrapped inside a callback with the equivalent of the following construct:

def get(self, *args, **kwargs):
    async_res = super(Connection, self).get(*args, **kwargs)
    d = Deferred()
    async_res.callback = d.callback
    async_res.errback = d.errback
    return d

Once the result is ready, the callback is invoked either with a MultiResult or Result object (depending on whether a single or multi variant of the operation was performed at the API).