Blog Post

Writing your own storage engine for Memcached, part 2

Trond Norbye of Couchbase Published

In the previous blog post I described the engine initialization and destruction. This blog post will cover the memory allocation model in the engine interface.

The memcached core is responsible for allocating all of the memory it needs for its connections (send / receive buffers etc), and the engine is responsible for allocating (and freeing) all of the memory it needs to keep track of the items. The engines shouldn't have to care about the memory the core allocates (and use), but the core will access the memory managed by the engine.

When the memcached core is about to store a new item it needs to get a (as of today continous) buffer to store the data for the item. The core will try to allocate this buffer by calling the allocate function in the API. So let's start extending our example code by adding out own implementation of the allocate function. The first thing we need to do is to add it to our engine descriptor we return from create_instance. We're going to add a number of functions in todays entry, so let's just map all of them while we're at it:

MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                  GET_SERVER_API get_server_api,
                                  ENGINE_HANDLE **handle)
{
   [ ... cut ... ]
  /*
   * Map the API entry points to our functions that implement them.
   */
   h->engine.initialize = fs_initialize;
   h->engine.destroy = fs_destroy;
   <b>h->engine.get_info = fs_get_info;
   h->engine.allocate = fs_allocate;
   h->engine.remove = fs_item_delete;
   h->engine.release = fs_item_release;
   h->engine.get = fs_get;
   h->engine.get_stats = fs_get_stats;
   h->engine.reset_stats = fs_reset_stats;
   h->engine.store = fs_store;
   h->engine.flush = fs_flush;
   h->engine.unknown_command = fs_unknown_command;
   h->engine.item_set_cas = fs_item_set_cas;
   h->engine.get_item_info = fs_get_item_info;</b>
     

The next thing we need to do is to create a data structure to keep the information we need. The purpose of this tutorial isn't to create a memory efficient implementation, but to exercise the API. So let's just create the following struct:

struct fs_item {
   void *key;
   size_t nkey;
   void *data;
   size_t ndata;
   int flags;
   rel_time_t exptime;
};
     

Our implementation of allocate would then look like:

 

static ENGINE_ERROR_CODE fs_allocate(ENGINE_HANDLE* handle,
                                     const void* cookie,
                                     item **item,
                                     const void* key,
                                     const size_t nkey,
                                     const size_t nbytes,
                                     const int flags,
                                     const rel_time_t exptime)
{
   struct fs_item *it = malloc(sizeof(struct fs_item));
   if (it == NULL) {
      return ENGINE_ENOMEM;
   }
   it->flags = flags;
   it->exptime = exptime;
   it->nkey = nkey;
   it->ndata = nbytes;
   it->key = malloc(nkey);
   it->data = malloc(nbytes);
   if (it->key == NULL || it->data == NULL) {
      free(it->key);
      free(it->data);
      free(it);
      return ENGINE_ENOMEM;
   }
   memcpy(it->key, key, nkey);
   *item = it;
   return ENGINE_SUCCESS;
}
     

If you look in the implementation above you'll see that we didn't return the pointer to the actual memory for the data storage to the memcached core. To get that address memcached will call get_item_info in the API. So let's implememnt that:

static bool fs_get_item_info(ENGINE_HANDLE *handle, const void *cookie,
                             const item* item, item_info *item_info)
{
   struct fs_item* it = (struct fs_item*)item;
   if (item_info->nvalue < 1) {
      return false;
   }

   item_info->cas = 0; /* Not supported */
   item_info->clsid = 0; /* Not supported */
   item_info->exptime = it->exptime;
   item_info->flags = it->flags;
   item_info->key = it->key;
   item_info->nkey = it->nkey;
   item_info->nbytes = it->ndata; /* Total length of the items data */
   item_info->nvalue = 1; /* Number of fragments used */
   item_info->value[0].iov_base = it->data; /* pointer to fragment 1 */
   item_info->value[0].iov_len = it->ndata; /* Length of fragment 1 */

   return true;
}
     

The get_item_info function is important and deserve more information. If you look in the engine API the "item" is defined as a void pointer, and we defined our own item-structure to keep track of the information we need on a per item basis. The memcached core will however need to know
where to read / write the memory for the key and the data going to / coming from a clinet. To do so in will invoke get_item_info. If you look closely at our implementation of fs_get_item_info you will see that the first thing I'm doing is to check that item_info->nvalue contains at least 1
element. Right now it will always be one, but the intention is that we're going to support scattered IO.

When the core is done moving the data it received over the wire into the item, it will try to store the item in our engine by calling store. So let's go ahead and create a simple implementation (we'll extend it later on in the tutorial):

static ENGINE_ERROR_CODE fs_store(ENGINE_HANDLE* handle,
                                  const void *cookie,
                                  item* item,
                                  uint64_t *cas,
                                  ENGINE_STORE_OPERATION operation,
                                  uint16_t vbucket)
{
   struct fs_item* it = item;
   char fname[it->nkey + 1];
   memcpy(fname, it->key, it->nkey);
   fname[it->nkey] = '\0';
   FILE *fp = fopen(fname, "w");
   if (fp == NULL) {
      return ENGINE_NOT_STORED;
   }
   size_t nw = fwrite(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nw != it->ndata) {
      remove(fname);
      return ENGINE_NOT_STORED;
   }

   *cas = 0;
   return ENGINE_SUCCESS;
}
     

If you look at the implementation above you will see that it doesn't implement the correct semantics for add/replace/set etc, and it will block memcached while we're doing file IO. Don't care about that right now, because we'll get back to that.

When the core is done using the item it allocated, it will release the item by calling the release function in the API. The engine may reuse the items storage for something else at this time. So let's hook up our release implementation:

static void fs_item_release(ENGINE_HANDLE* handle,
                            const void *cookie,
                            item* item)
{
   struct fs_item *it = item;
   free(it->key);
   free(it->data);
   free(it);
}
     

Now we've created all of the code to successfully store items in our engine, but we can't read any of them back. So let's implement get

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{

   char fname[nkey + 1];
   memcpy(fname, key, nkey);
   fname[nkey] = '\0';

   struct stat st;
   if (stat(fname, &st) == -1) {
      return ENGINE_NOT_STORED;
   }

   struct fs_item* it = NULL;
   ENGINE_ERROR_CODE ret = fs_allocate(handle, cookie, (void**)&it, key, nkey,
                                       st.st_size, 0, 0);
   if (ret != ENGINE_SUCCESS) {
      return ENGINE_ENOMEM;
   }

   FILE *fp = fopen(fname, "r");
   if (fp == NULL) {
      fs_release(handle, cookie, it);
      return ENGINE_FAILED;
   }

   size_t nr = fread(it->data, 1, it->ndata, fp);
   fclose(fp);
   if (nr != it->ndata) {
      fs_release(handle, cookie, it);
      return ENGINE_FAILED;
   }

   *item = it;
   return ENGINE_SUCCESS;
}
     

Let's add a dummy implementation for the rest of the API and try to load and test the engine:

static const engine_info* fs_get_info(ENGINE_HANDLE* handle)
{
   static engine_info info = {
      .description = "Filesystem engine v0.1",
      .num_features = 0
   };

   return &info;
}

static ENGINE_ERROR_CODE fs_item_delete(ENGINE_HANDLE* handle,
                                        const void* cookie,
                                        const void* key,
                                        const size_t nkey,
                                        uint64_t cas,
                                        uint16_t vbucket)
{
   return ENGINE_KEY_ENOENT;
}

static ENGINE_ERROR_CODE fs_get_stats(ENGINE_HANDLE* handle,
                                      const void* cookie,
                                      const char* stat_key,
                                      int nkey,
                                      ADD_STAT add_stat)
{
   return ENGINE_SUCCESS;
}

static ENGINE_ERROR_CODE fs_flush(ENGINE_HANDLE* handle,
                                  const void* cookie, time_t when)
{

   return ENGINE_SUCCESS;
}

static void fs_reset_stats(ENGINE_HANDLE* handle, const void *cookie)
{

}

static ENGINE_ERROR_CODE fs_unknown_command(ENGINE_HANDLE* handle,
                                            const void* cookie,
                                            protocol_binary_request_header *request,
                                            ADD_RESPONSE response)
{
   return ENGINE_ENOTSUP;
}

static void fs_item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
                            item* item, uint64_t val)
{
}

     

So let's go ahead and try our engine:

trond@opensolaris> /opt/memcached/bin/memcached -E .libs/fs_engine.so
     

From another terminal I'm typing in:

trond@opensolaris> telnet localhost 11211
Trying ::1...
Connected to opensolaris.
Escape character is '^]'.
add test 0 0 4
test
STORED
get test
VALUE test 0 4
test
END
quit
Connection to storm closed by foreign host.
     

Terminate memcached by pressing ctrl-c, and look in the current directory:

trond@opensolaris> ls -l test
-rw-r--r--   1 trond    users          6 Oct  8 12:56 test
trond@opensolaris> cat test
test
     

That's all for this time.