Right now we've got an engine capable of running get and set load, but it is doing synchrounus filesystem IO. We can't serve our client faster than we can read the item from disk, but we might serve other connections while we're reading the item off disk.
In this entry we're going to fix our get
and store
method so that they don't block the engine API. As I've said earlier, the intention of this tutorial is to focus on the engine API. That means I'm not going to try to make an effective design, because that could distract the focus from what I'm trying to explain. If people are interested in how we could optimize this, I could add a second part of the tutorial in the future… Just let me know.
In order to implement asynchronous operations in our engine, we need to make use of the API the server makes available to us in create_instance
. Let's extend our engine structure to keep track of the server API:
ENGINE_HANDLE_V1 engine;
SERVER_HANDLE_V1 sapi;
};
…
MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
GET_SERVER_API get_server_api,
ENGINE_HANDLE **handle)
{
…
h->engine.item_set_cas = fs_item_set_cas;
h->engine.get_item_info = fs_get_item_info;
h->sapi = *get_server_api();
…
To implement an asynchronous function in the engine interface, the engine needs to dispatch the request to another thread before it return ENGINE_EWOULDBLOCK
from the engine function. When the backend is done processing the result, it notifies the memcached core by using the notify_io_complete
function in the server interface. If an error occurred while processing the request, the memcached core will return the error message to the client. If your engine called notify_io_complete
with ENGINE_SUCCESS
, the memcached core will call the engine interface function once more with the same argument as the
first time.
If you look at the server api, you'll see that the it got an interface for storing an engine-specific pointer. This will make our life easier when we want to implement async io. So let's go ahead and update our fs_get
method:
const void* cookie,
item** item,
const void* key,
const int nkey,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)engine;
void *res = engine->sapi.cookie->get_engine_specific(cookie);
if (res != NULL) {
*item = res;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
return ENGINE_SUCCESS;
}
…
The next thing we need to do is to create a function that runs asynchronously and stores the result in the engine_specific setting for the cookie. Since we're going to use async tasks for all of the engine methods, let's go ahead and create a function to run tasks in another thread:
{
pthread_attr_t attr;
pthread_t tid;
if (pthread_attr_init(&attr) != 0 ||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
pthread_create(&tid, &attr, task->callback, task) != 0) {
return ENGINE_FAILED;
}
return ENGINE_EWOULDBLOCK;
}
As you can see from the code I'm going to create a new thread to execute each operation. This isn't very efficient, because creating a new thread got a substansial overhead. In your design you would probably want a pool of threads to run your tasks.
The newly created thread would run the specialized callback with a pointer to the task as it's only argument. So what does this task structure look like?
struct fs_engine *engine; /* Pointer to the engine */
const void *cookie; /* The cookie requesting the operation */
void *(*callback)(void *arg);
union {
struct {
char key[PATH_MAX];
size_t nkey;
} get; /* Data used by the get operation */
struct {
item *item;
ENGINE_STORE_OPERATION operation;
} store; /* Data used by the store operation */
} data;
};
So let's finish up fs_get
:
const void* cookie,
item** item,
const void* key,
const int nkey,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)handle;
/* Check to see if this is the callback from an earlier ewouldblock */
void *res = engine->sapi.cookie->get_engine_specific(cookie);
if (res != NULL) {
*item = res;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
return ENGINE_SUCCESS;
}
/* We don’t support keys longer than PATH_MAX */
if (nkey >= PATH_MAX) {
return ENGINE_FAILED;
}
/* Set up the callback struct */
struct task *task = calloc(1, sizeof(*task));
if (task == NULL) {
return ENGINE_ENOMEM;
}
task->engine = (struct fs_engine *)handle;
task->cookie = cookie;
task->callback = fs_get_callback;
memcpy(task->data.get.key, key, nkey);
task->data.get.nkey = nkey;
ENGINE_ERROR_CODE ret = execute(task);
if (ret != ENGINE_EWOULDBLOCK) {
free(task);
}
return ret;
}
If you look at the code above, you'll see that we specify gs_get_callback
as the function to execute. So let's go ahead and implement the callback:
{
struct task *task = arg;
char *fname = task->data.get.key;
task->data.get.key[task->data.get.nkey] = ‘