January 23, 2012

So, how do I use this libcouchbase?

Some of you may have noticed that we released Couchbase 1.8 earlier today, and a new set of smart clients for various languages. For me personally this is a milestone, because libcouchbase is now a supported client for the C language.

So why do I care about that? Well, libcouchbase started out of my needs to easily test various components of the server. Since I did most of my development on the components on the server implemented in C, it made sense for me to use C for my testing.

I’ve received some questions on how libcouchbase work in a multithreaded context, so I should probably start off by clarifying that: libcouchbase doesn't use any form of locking to protect it's internal data structures, but it doesn't mean you can't use libcouchbase in a multithreaded program. All it means is that you as a client user must either use locking to protect yourself from accessing the libcouchbase instance from multiple threads at the same time, or just let each thread operate on it's own instance of libcouchbase. One easy way to solve this is to have a "pool" of libcouchbase instances each thread pop and push its instance to whenever they need to access a Couchbase server. Access to this pool should be protected with a lock (but I guess you figured that out ;-)

In this blog post I'll create a demo program you may use to upload JSON documents into a Couchbase server. You'll find the complete source available at https://github.com/trondn/vacuum if you would like to try the example.

The idea of this program is that it will "monitor" a directory and upload all files appearing there into a Couchbase cluster. I'm pretty sure most of you start thinking: "how do we do that in a portable way?". That's not an easy task to do, so I'm not even going to try to do that. I'll try to write it in a semi-portable way so that it shouldn't be that hard to implement on other platforms. That means that I'm using the following limitations:

  • I'm using opendir and readdir to traverse the directory. This can easily be reimplemented with FindFirst and FindNext on Microsoft Windows.
  • Monitor of the directory means that I'm going to scan the directory, then sleep a given number of seconds before running another scan. I know some platforms supports subscribing of changes to the filesystem, but I'm not going to spend time on that (at least not right now ;-)).
  • To avoid file locking or accessing the file while others are writing the file, the clients should write the file into the directory with a leading "dot" in the filename, and then rename the file when they are done. The program ignores all files starting with a dot.

So let's jump to the code. The first piece of code that might be interesting to look at would be where we create the libcouchbase instance in main():

    instance = libcouchbase_create(host, user, passwd, bucket, NULL);
    if (instance == NULL) {
        fprintf(stderr, "Failed to create couchbase instance\n");
        exit(EXIT_FAILURE);
    }

The above code snippet creates the libcouchbase instance. There is no way you can use a static structure for this, because doing so will make it incredible hard to maintain binary compatibility. I like to be able to fix bugs within the library and release new versions you may use without having to recompile your program, and by hiding the internal datastructures from the clients makes it easier to ensure that the client don't depend on their size. The first parameter to libcouchbase_create is the name (and port) of the REST port for the couchbase server (default: localhost:8091). The second and third parameter is the credentials you'd like to use to connect to the REST port to get the pool information (default is to not authenticate). The forth parameter is the bucket you'd like to connect to, and if you don't specify a bucket you'll end up in the "default bucket". The fifth argument is a special object you may want to use if you are going to use "advanced" features in libcouchbase. Most users will probably just use the defaults and pass NULL here.

The next thing we need to do is to set up some callback handlers to be able to figure out what happens. In the example we're only going to use one operation (to load data into the cache) so we'll need to set up a handler to catch the result of storage operations. Unfortunately we may also encounter problems, so we need to set up an error handler (we'll get back to work in a bit).

    libcouchbase_set_storage_callback(instance, storage_callback);
    libcouchbase_set_error_callback(instance, error_callback);

Now that we've created and initialized the instance, we need to try to connect to the Couchbase cluster:

    libcouchbase_error_t ret = libcouchbase_connect(instance);
    if (ret != LIBCOUCHBASE_SUCCESS) {
        fprintf(stderr, "Failed to connect: %s\n",
                libcouchbase_strerror(instance, ret));
        exit(EXIT_FAILURE);
    }

Due to the fact that libcouchbase is fully asynchronous, all that happened above was that we initiated the connect. That means that we need to wait for the server to be connected to the Couchbase cluster and connect to the correct bucket. If our program should do other stuff now would be the time to do so, but since we don't have any other initialization to do we can just wait for it to complete:

    libcouchbase_wait(instance);

One of the "cool" features we've got in libcouchbase is that it provides an internal statistics interface, so we may tell it to collect timing information of the operations with the following snippet:

   if ((ret = libcouchbase_enable_timings(instance) != LIBCOUCHBASE_SUCCESS)) {
      fprintf(stderr, "Failed to enable timings: %s\n",
              libcouchbase_strerror(instance, ret));
   }

Our program is now fully initialized, and we can enter the main loop that looks like pretty much like:
   while (forever)
   {
      process_files();
      sleep(nsec);
   }

So how does our process_files() look like? I'm not going to make the example too big by pasting all of it, but the first piece in there looks like:

   if (de->d_name[0] == '.') {
       if (strcmp(de->d_name, ".dump_stats") == 0) {
           fprintf(stdout, "Dumping stats:\n");
           libcouchbase_get_timings(instance, stdout, timings_callback);
           fprintf(stdout, "----\n");
           remove(de->d_name);<
       }
       continue;
   }

As you see from the above code snippet we'll ignore all files that starts with a '.' except for the file named ".dump_stats". Whenever we see that file we dump the internal stats timings by using the timings_callback (I'll get back to that later).

The next thing we do is to try to read the file into memory and decode it's JSON before we try to get the "_id" field to use as a key. If all of that succeeds, we try to store the data in Coucbase with:

      int error = 0;
      ret = libcouchbase_store(instance, &error, LIBCOUCHBASE_SET,
                               id->valuestring, strlen(id->valuestring),
                               ptr, size, 0, 0, 0);
      if (ret == LIBCOUCHBASE_SUCCESS) {
         libcouchbase_wait(instance);
      } else {
         error = 1;
      }

The &error piece here is quite interesting. It is a "cookie" passed to the callback, so that I may know if I encountered a problem or not. You'll see how I'm using it when I discuss the storage_callback below.

This is basically all of the important logic in the example. I promised that I would get back to the different callbacks, so let's start by looking at the error callback:

   static void error_callback(libcouchbase_t instance,
                              libcouchbase_error_t error,
                              const char *errinfo)
   {
       /* Ignore timeouts... */
       if (error != LIBCOUCHBASE_ETIMEDOUT) {
           fprintf(stderr, "\rFATAL ERROR: %s\n",
                   libcouchbase_strerror(instance, error));
           if (errinfo && strlen(errinfo) != 0) {
               fprintf(stderr, "\t\"%s\"\n", errinfo);
           }
           exit(EXIT_FAILURE);
       }
   }

As you see from the above snippet libcouchbase will call the error_callback whenever a timeout occurs, but we just want to retry the operation. If we encounter a real error we print out an error message and terminate the program.

The next callback we use is the storage_callback. It is called when the store operation completed, so it is the right place for us to figure out if an error occured while storing the data. Our callback looks like:

   static void storage_callback(libcouchbase_t instance,
                                const void *cookie,
                                libcouchbase_storage_t operation,
                                libcouchbase_error_t err,
                                const void *key, size_t nkey,
                                uint64_t cas)
   {
      int *error = (void*)cookie;
       if (err == LIBCOUCHBASE_SUCCESS) {
           *error = 0;
       } else {
           *error = 1;
           fprintf(stderr, "Failed to store \"");
           fwrite(key, 1, nkey, stderr);
           fprintf(stderr, "\": %s\n",
                   libcouchbase_strerror(instance, err));
           fflush(stderr);
       }
   }

As you see we're storing the result of the operation in the integer passed as the cookie. The observant reader may see that we might as well could unlink the file and remove the memory from within the callback (if we provided that information as the cookie instead ;))

The last callback to cover is the timings callback we're using to dump out the timing statistics.

   static void timings_callback(libcouchbase_t instance, const void *cookie,
                                libcouchbase_timeunit_t timeunit,
                                uint32_t min, uint32_t max,
                                uint32_t total, uint32_t maxtotal)
   {
      char buffer[1024];
      int offset = sprintf(buffer, "[%3u - %3u]", min, max);
      switch (timeunit) {
      case LIBCOUCHBASE_TIMEUNIT_NSEC:
         offset += sprintf(buffer + offset, "ns");
         break;
      case LIBCOUCHBASE_TIMEUNIT_USEC:
         offset += sprintf(buffer + offset, "us");
         break;
      case LIBCOUCHBASE_TIMEUNIT_MSEC:
         offset += sprintf(buffer + offset, "ms");
         break;
      case LIBCOUCHBASE_TIMEUNIT_SEC:
         offset += sprintf(buffer + offset, "s");
         break;
      default:
         ;
      }

      int num = (float)40.0 * (float)total / (float)maxtotal;
      offset += sprintf(buffer + offset, " |");
      for (int ii = 0; ii < num; ++ii) {
         offset += sprintf(buffer + offset, "#");
      }

      offset += sprintf(buffer + offset, " - %u\n", total);
      fputs(buffer, (FILE*)cookie);
   }

When you request the timings from libcouchbase it reports all of the timing metrics collected by calling the timings callback. As you can see from the API you'll get the minimum, maximum value for the range, and the number of operations performed within that range. These metrics are not to be considered as exact numbers, because they depend on when what you do in your client code from the time you call the operation until you call libcouchbase_wait for the operation to complete.

So let's run the go ahead and run the program. I've prepopulated /var/spool/vacuum with a number of JSON files, to have the program do something.

trond@illumos ~> ./vacuum
sleeping 3 secs before retry..

From another withdow I execute the command:
 

trond@illumos ~> touch /var/spool/vacuum/.dump_stats

And when the timer expires in first window, it prints out:

Dumping stats:
[ 60 - &nbsp;69]us |######################################## - 18
[ 70 - &nbsp;79]us |## - 1
[240 - 249]us |## - 1
----
sleeping 3 secs before retry..


Hopefully this blog revealed how easy it is to use libcouchbase to communicate with a Couchbase cluster. We've got various clients for other programming languages like PHP and Ruby built on top of libcouchbase, so I can promise you that you'll see more functionallity added!

Comments