Back to Blogs

Introducing the Couchbase Kafka Connector

Sergey Avseyev of Couchbase Published

Welcome to the new Couchbase kafka connector! It provides functionality to direct stream of events from Couchbase Server (3.0 or later) to Kafka. It is still under development, so use with care and open issues if you come across them. Its issue tracker is located at https://issues.couchbase.com/browse/KAFKAC.  Thanks much to Shibi of PayPal who had written another connector based on an older interface; it inspired this one.

Getting it

You can find the project source code in github here. The developer previews are available through our own maven repository, the GA artifacts will be available on maven central. Here are the coordinates:

  • Group ID: com.couchbase.client
  • Artifact ID: kafka-connector
  • Version: 1.0.0-dp1
apply plugin: 'java'

repositories {
    mavenCentral()
    maven { url { "http://files.couchbase.com/maven2" } }
    mavenLocal()
}

dependencies {
    compile(group: 'com.couchbase.client', name: 'kafka-connector', version: '1.0.0-dp1')
}

Usage

The usage of the library is pretty easy. Lets say we would like to receive every modification from the Couchbase Server and send to Kafka only document body (by default the connector serializes document body and metadata to JSON). To achieve that you need to define a filter class which allows only instances of MutationMessage to pass through:

package example;

import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.kafka.DCPEvent;
import com.couchbase.kafka.filter.Filter;

public class SampleFilter implements Filter {
    @Override
    public boolean pass(final DCPEvent dcpEvent) {
        return dcpEvent.message() instanceof MutationMessage;
    }
}

And an encoder class, which takes the document value converts it to byte array:

package example;

import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.kafka.DCPEvent;
import com.couchbase.kafka.coder.AbstractEncoder;
import kafka.utils.VerifiableProperties;

public class SampleEncoder extends AbstractEncoder {
    public SampleEncoder(final VerifiableProperties properties) {
        super(properties);
    }

    @Override
    public byte[] toBytes(final DCPEvent dcpEvent) {
        MutationMessage message = (MutationMessage)dcpEvent.message();
        return message.content().toString(CharsetUtil.UTF_8).getBytes();
    }
}

That's essentially enough to setup Couchbase-Kafka bridge:

package example;

import com.couchbase.kafka.CouchbaseKafkaConnector;
import com.couchbase.kafka.CouchbaseKafkaEnvironment;
import com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment;

public class Example {
    public static void main(String[] args) {
        DefaultCouchbaseKafkaEnvironment.Builder builder =
                (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment
                        .builder()
                        .kafkaFilterClass("example.SampleFilter")
                        .kafkaValueSerializerClass("example.SampleEncoder")
                        .dcpEnabled(true);
        CouchbaseKafkaEnvironment env = builder.build();
        CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(
                env, "couchbase1.vagrant", "default", "", "kafka1.vagrant", "default");
        connector.run();
    }
}

The couchbase1.vagrant and kafka1.vagrant addresses above are locations of Couchbase Server and Kafka correspondingly, which could be easily set up using provisioning scripts from env/ directory. Just navigate there and run vagrant up.

«   Back to blogs