October 17, 2010

Writing your own storage engine for Memcached, part 3

Right now we've got an engine capable of running get and set load, but it is doing synchrounus filesystem IO. We can't serve our client faster than we can read the item from disk, but we might serve other connections while we're reading the item off disk.

In this entry we're going to fix our get and store method so that they don't block the engine API. As I've said earlier, the intention of this tutorial is to focus on the engine API. That means I'm not going to try to make an effective design, because that could distract the focus from what I'm trying to explain. If people are interested in how we could optimize this, I could add a second part of the tutorial in the future... Just let me know.

In order to implement asynchronous operations in our engine, we need to make use of the API the server makes available to us in create_instance. Let's extend our engine structure to keep track of the server API:

 

struct fs_engine {
   ENGINE_HANDLE_V1 engine;
   <b>SERVER_HANDLE_V1 sapi;</b>
};

...

MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                  GET_SERVER_API get_server_api,
                                  ENGINE_HANDLE **handle)
{

...

   h->engine.item_set_cas = fs_item_set_cas;
   h->engine.get_item_info = fs_get_item_info;
   <b>h->sapi = *get_server_api();</b>

...

     


To implement an asynchronous function in the engine interface, the engine needs to dispatch the request to another thread before it return ENGINE_EWOULDBLOCK from the engine function. When the backend is done processing the result, it notifies the memcached core by using the notify_io_complete function in the server interface. If an error occurred while processing the request, the memcached core will return the error message to the client. If your engine called notify_io_complete with ENGINE_SUCCESS, the memcached core will call the engine interface function once more with the same argument as the
first time.

If you look at the server api, you'll see that the it got an interface for storing an engine-specific pointer. This will make our life easier when we want to implement async io. So let's go ahead and update our fs_get method:

 

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)engine;
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   if (res != NULL) {
      *item = res;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      return ENGINE_SUCCESS;

   }

...
     


The next thing we need to do is to create a function that runs asynchronously and stores the result in the engine_specific setting for the cookie. Since we're going to use async tasks for all of the engine methods, let's go ahead and create a function to run tasks in another thread:

 

static ENGINE_ERROR_CODE execute(struct task *task)
{
   pthread_attr_t attr;
   pthread_t tid;

   if (pthread_attr_init(&attr) != 0 ||
       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
       pthread_create(&tid, &attr, task->callback, task) != 0) {
      return ENGINE_FAILED;
   }

   return ENGINE_EWOULDBLOCK;
}
     


As you can see from the code I'm going to create a new thread to execute each operation. This isn't very efficient, because creating a new thread got a substansial overhead. In your design you would probably want a pool of threads to run your tasks.

The newly created thread would run the specialized callback with a pointer to the task as it's only argument. So what does this task structure look like?

 

struct task {
   struct fs_engine *engine; /* Pointer to the engine */
   const void *cookie; /* The cookie requesting the operation */
   void *(*callback)(void *arg);
   union {
      struct {
         char key[PATH_MAX];
         size_t nkey;
      } get; /* Data used by the get operation */
      struct {
         item *item;
         ENGINE_STORE_OPERATION operation;
      } store; /* Data used by the store operation */
   } data;
};
     

So let's finish up fs_get:

 

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)handle;
   /* Check to see if this is the callback from an earlier ewouldblock */
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   if (res != NULL) {
      *item = res;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      return ENGINE_SUCCESS;
   }

   /* We don't support keys longer than PATH_MAX */
   if (nkey >= PATH_MAX) {
      return ENGINE_FAILED;
   }

   /* Set up the callback struct */
   struct task *task = calloc(1, sizeof(*task));
   if (task == NULL) {
      return ENGINE_ENOMEM;
   }

   task->engine = (struct fs_engine *)handle;
   task->cookie = cookie;
   task->callback = fs_get_callback;
   memcpy(task->data.get.key, key, nkey);
   task->data.get.nkey = nkey;

   ENGINE_ERROR_CODE ret = execute(task);
   if (ret != ENGINE_EWOULDBLOCK) {
      free(task);
   }
   return ret;
}
     


If you look at the code above, you'll see that we specify gs_get_callback as the function to execute. So let's go ahead and implement the callback:

 

static void *fs_get_callback(void *arg)
{
   struct task *task = arg;
   char *fname = task->data.get.key;
   task->data.get.key[task->data.get.nkey] = '\0';

   struct stat st;
   if (stat(fname, &st) == -1) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_KEY_ENOENT);
      free(task);
      return NULL;
   }

   struct fs_item* it = NULL;
   ENGINE_ERROR_CODE ret = fs_allocate((ENGINE_HANDLE*)task->engine,
                                       task->cookie, (void**)&it,
                                       task->data.get.key,
                                       task->data.get.nkey,
                                       st.st_size, 0, 0);
   if (ret != ENGINE_SUCCESS) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_ENOMEM);
      free(task);
      return NULL;
   }

   FILE *fp = fopen(fname, "r");
   if (fp == NULL) {
      fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_FAILED);
      free(task);
      return NULL;
   }

   size_t nr = fread(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nr != it->ndata) {
      fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_FAILED);
      free(task);
      return NULL;
   }

   task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
   task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                 ENGINE_SUCCESS);
   return NULL;
}
     


As you see it's quite easy to add asynchronous support for the engine functions. Let's go ahead and do the same for fs_store:

 

static ENGINE_ERROR_CODE fs_store(ENGINE_HANDLE* handle,
                                  const void *cookie,
                                  item* item,
                                  uint64_t *cas,
                                  ENGINE_STORE_OPERATION operation,
                                  uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)handle;
   /* Check to see if this is the callback from an earlier ewouldblock */
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   if (res != NULL) {
      *cas = 0;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      return ENGINE_SUCCESS;
   }


   /* Set up the callback struct */
   struct task *task = calloc(1, sizeof(*task));
   if (task == NULL) {
      return ENGINE_ENOMEM;
   }

   task->engine = (struct fs_engine *)handle;
   task->cookie = cookie;
   task->callback = fs_store_callback;
   task->data.store.item = item;
   task->data.store.operation = operation;

   ENGINE_ERROR_CODE ret = execute(task);
   if (ret != ENGINE_EWOULDBLOCK) {
      free(task);
   }
   return ret;
}


And fs_store_callback looks like the following:

 

static void *fs_store_callback(void *arg)
{
   struct task *task = arg;
   struct fs_item* it = task->data.store.item;
   char fname[it->nkey + 1];
   memcpy(fname, it->key, it->nkey);
   fname[it->nkey] = '\0';

   FILE *fp = fopen(fname, "w");
   if (fp == NULL) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_NOT_STORED);
      free(task);
      return NULL;
   }

   size_t nw = fwrite(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nw != it->ndata) {
      remove(fname);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_NOT_STORED);
      free(task);
      return NULL;
   }

   task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
   task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                 ENGINE_SUCCESS);
   return NULL;
}


If you look closely at the code above you'll see that we still don't differentiate between add/set/replace, but we'll fix that in the next session.

Comments