Intro

Couchbase Kafka Connector 1.2.0 just shipped. Along with the various bug fixes, there is new sample code for a Kafka consumer in addition to the Kafka producer that was previously available. To quickly review the terms:

  • A Kafka producer writes data to Kafka, so it’s a source of messages from Kafka’s perspective.
  • A consumer in Kafka terminology is a process that subscribes to topics and then does something with the feed of published messages that are emitted from a Kafka cluster. It’s basically a sink.

In this blog, you’ll get up and running with a “Hello World!”-style sample Kafka consumer that writes to Couchbase. Along the way, you’ll also get a sandbox environment with a Kafka broker and a single node Couchbase Server so that you can actually run and modify the sample consumer and producer.

Installing Prerequisites

The samples are part of the Couchbase Kafka Connector source tree. To get them, just clone the whole repository:

Now, let’s setup your testing environment using pre-configured Kafka and Couchbase Server images. You have to install Vagrant, VirtualBox, and Ansible in order to set them up locally. If you have these services installed somewhere else, make sure you adjust the host addresses throughout this guide appropriately.

Check versions of dependencies:

You can assign human readable names to the boxes by using the plugin for Vagrant. If you don’t already have it installed, use the following command:

Now you’re ready to provision the servers and get running:

Note: If a server fails to install due to timeouts, retry “vagrant up” after a few minutes and it may work.

Verify that the hosts are responding:

If you navigate to you should be able to see your single-node Couchbase Server configured with credentials Administrator/password.

Building the Samples

To avoid any classpath issues, use maven to create a self-contained JAR file for each sample application.

The generator application is a minimal CLI application. It uses the Couchbase Java SDK to wrap input lines from STDIN into JSON documents and sends them to the “default” bucket on Couchbase Server:

The producer attaches to Couchbase Server and transmits all mutations to Kafka. This application uses the couchbase-kafka-connector project behind the scenes.

Consumer is a typical Kafka consumer, which by default just outputs any incoming message in the topic “default” to STDOUT.

Running the Samples

Now that you have everything prepared, it’s time to run all the samples. You’ll need three different shell sessions because each of them runs a process until stopped. We’ll assume that you are in the  /tmp/kafka-connector/samples directory.

First, start your generator:

It should output the connection settings and then fall to a command prompt >. You can type anything there and verify that it’s being created properly by looking in the Couchbase Server Admin UI:

Documents from generator in the bucket

At this moment you can run the connector example

For every line you type in the generator, you will see a line from the producer like this:

The sample writes it just before sending the payload to Kafka, in the filter class implementation. Let’s check how the Kafka receives these messages.

You can continue playing with it as long as all three services are running.

Developing with Couchbase Kafka Connector

Let’s move on and take a look at the code. All three applications are pretty friendly for experiments, for example, the generator fits in just a few lines:

Basically, generator opens a connection to bucket “default” on your “couchbase1.vagrant” instance and writes your messages to random keys. You can extend it to send other types of events. Another thing you might want to try to do is to remove keys.

By default, Couchbase Connector for Kafka runs in server mode, where it borrows active thread and actively listens to Couchbase Server for new events. There are several points where you can apply your ideas or changes. The most obvious one is configuration builder, where you not only specify the credentials and addresses of the services you are connecting to, but you can also specify various serializer and filter classes.

The sample application implements several of them. Filter class is the simplest:

Here you can put in any custom checks you want, and if pass() returns  false, the connector discards the message and won’t send it on to Kafka.

Default Encoder, which comes with the connector distribution, tries to represent every message as JSON, but that probably is not what you need, so you can apply and conversion to DCPEvent instance and return byte array, which will be stored in Kafka. In this example, we just convert events to their string representation.

A more advanced setting is StateSerializer interface. By implementing it, you can control how the library will track stream cursors (i.e. the sequence numbers for every partition inside Couchbase Server), and whether it will resume after connector restart. There is a Zookeeper implementation of state serializer in the distribution. Here in the sample, we’ve implemented NullStateSerializer which doesn’t persist anything, but it does show a minimal implementation.

The last component of your demo cluster is the Kafka consumer AbstractConsumer, which is a pretty typical instance of a consumer. It consists of two parts: , which implements bootstrap and positioning on the Kafka topic, and PrintConsumer, which carries your “business logic”, or just outputs every message it gets passed by AbstractConsumer:

As in the other examples here, you can play around with modifying the sample consumer. You can even close the circuit by sending everything back to Couchbase Server. Kafka is distributed software, just like Couchbase Server, so keep that in mind when running on your own cluster and adjust the main() function accordingly. In our sample, we have only a single partition, partition (0) in Kafka, so our main looks like this:

Of course, in a production cluster you’ll be running more than one partition.

Conclusion

I hope this helps you get off to a good start with Couchbase and Kafka. Cheers!

Author

Posted by Sergey Avseyev, SDK Engineer, Couchbase

Sergey Avseyev is a SDK Engineer at Couchbase. Sergey Avseyev is responsible for development of Kafka connector, and underlying library, which implements DCP, Couchbase replication protocol. Also maintaining PHP SDK for Couchbase.

Leave a reply