On behalf of the SDK team I’m happy to announce the release of the second major version of the Couchbase Spark Connector which most importantly brings compatibility with the Apache Spark 2.0.x series.

A special thanks goes to community contributors Luca Rosellini and Shivansh Srivastava who both contributed to the connector and helped to get it in shape for this release.

Getting It

As before, the artifacts are published on Maven Central under the following coordinates:

  • groupId:com.couchbase.client
  • artifactId: spark-connector_2.10 or spark-connector_2.11
  • version: 2.0.0

If you are using sbt, just use “com.couchbase.client” %% “spark-connector” % “2.0.0”.

Note that they are also published through spark-packages.org and can be downloaded as an archive as well (here for Scala 2.10 as well as Scala 2.11).

Highlights

Apart from bringing compatibility with Spark 2.0.x and various bugfixes, this release features enhanced Spark Streaming support as well as initial support for Structured Streaming.

Spark Streams

The Connector previously featured experimental support for DStreams built on a DCP (the Couchbase change protocol) implementation which is part of the Java SDK. For various reasons we were running into issues with that and as a result are investing significant effort to build a standalone DCP-client that is low overhead, stable and ready for production.

The Spark Connector 2.0.0 builds on top of a pre-release of this new client and going forward will stabilize its implementation alongside the dependency. As a result the current implementation now for the first time allows to dynamically scale the cluster while streaming changes, it supports automatic flow control as well as simple transparent rollback handling in case of node failures.

From an API point of view all those changes are transparent, the same methods are used to initiate the stream as well as consuming the messages. Here is an example which creates a DStream and prints out every mutation and deletion that arrives from the server. It is possible to start streaming from the very beginning or “now”, where “now” is the current system state so only future mutations and deletions will be sent over the stream.

In the next versions we are planning to provide support for persisting snapshot information and restoring state from them to better survive spark node failures and application restarts. If you need those guarantees right now read on for Structured Streaming support which today already provides this using Spark functionality.

Structured Streaming

Support for Structured Streaming also builds on the new DCP client but since Spark treats the mutations as an append-only table structure right now only document changes are streamed and deletions are ignored. Once deletions can be properly signalled to Spark we will provide support for that too.

The connector provides support for using it both as a source or/and as a sink.

Spark maintains its own WAL (Write Ahead Log) and snapshots when consunga stream so it is possible (while not as efficient as we’d like it to be in future versions) to transparently resume state from the latest consumed mutation.

Since a table abstraction requires some kind of schema it is recommended to provide a custom one based on your documents, otherwise a default schema is used which works but only provides limited insight into the content of a mutation. The default schema looks like this:

The following example shows how all mutations from the “travel-sample” bucket are streamed and then marshalled into the provided schema. Since just printing all the mutations is not very exciting it groups them by the “type” field and prints out the number of times they show up:

Since Spark keeps a total count, try changing one of the documents in the server UI and you’ll see the count of its type increase by one in the Spark logs.

In addition to functioning as a stream source, you can also use it as a sink to store the results back into Couchbase. One important aspect is to specify the column that should be used as the document ID which the following example illustrates:

This example consumes a socket stream, performs a word count on the received lines and then stores the count into Couchbase. Note how the “idField” is set to “value” so the word itself is used as the document ID.

As a final note remember that Structured Streaming itself is marked as experinetal in Apache Spark so as the implementation changes and matures we’ll adapt our implementation as well. Since this is a new feature we are also actively looking for feedback and bug reports.

See it in Action

If you want to try it out but don’t have a concrete example to work on, I recommend checking out our samples repository which provides various examples based on our “travel-sample” bucket.

If you are at Spark Summit Europe this week in Brussels, there is a talk on the Connector called “Augmenting the Operational Database with Spark” where you can learn more about the motivation and internals as well.

Finally, if you are in the Bay Area and/or coming to Couchbase Connect there will be several presentations on Spark and its integration with Couchbase Server and we’d love to see you there!

Author

Posted by Michael Nitschinger

Michael Nitschinger works as a Principal Software Engineer at Couchbase. He is the architect and maintainer of the Couchbase Java SDK, one of the first completely reactive database drivers on the JVM. He also authored and maintains the Couchbase Spark Connector. Michael is active in the open source community, a contributor to various other projects like RxJava and Netty.

Leave a reply