Spark Connector 1.0.0 Released

After two developer previews and one beta I'm super happy to announce the first stable release of our Couchbase Spark Connector. The timing is no coincidence, since next week Spark Summit Europe 2015 is happening in Amsterdam. We are sponsoring the event, and as a result you can find me and my colleagues there at the Couchbase booth!

This stable release marks the end of larger breaking changes, bringing stability into the API and a clear path going forward. If you haven't read the previous announcements, the following post provides a whirlwind tour of the features and capabilities.

The Connector is distributed from Maven Central (as well as spark-packages.org), so if you want to experiment with it using the spark-shell, this is all you need to get up and running:

To whet your appetite, here is a full code sample you can execute against our “travel-sample” dataset. It uses Spark SQL to create a data frame for all airlines (based on a predicate you specify) and then selects some fields and applies ordering as well as a limit:

This prints:

In a few lines of code you can run all kinds of queries for data analysis, ETL or machine learning on top of Couchbase. To me that's pretty awesome – if you also like it read on for all the details.

By the way, the full documentation can be found here.

Spark Core – The Scalable Foundation

The lowest user-facing API in Spark are the RDD (Resilient Distributed Datasets). It is basically a collection of data, which spark distributes all over the cluster. Since Spark is a big data crunching machine but not a database, it needs mechanisms to create RDDs as well as to persist RDDs at the end of the computations. To assist with this, Couchbase provides:

  • API to create RDDs through KeyValue, Views and N1QL
  • Persist RDDs into a Couchbase Bucket through KeyValue

The detailed documentation for those tasks is availabe here. The following code samples show you how to create RDDs easily as well as persist them. Note that these samples just expect a SparkContext to be available.

And here a more complicated example which reads all airlines, performs a classic word count on their names, aggregates the results and stores them in a document back in the Couchbase cluster:

As you can imagine, behind the scenes lots of things are going on. The API is turned into Couchbase queries, but more importantly the connector handles resources completely transparently. Since your computations will be executed on arbitrary workers in the cluster, the connector opens connections where needed in an efficient fashion. So you just need to tell Spark what do fetch or persist – the connector will handle the rest.

If you run Spark workers side-by-side Couchbase nodes, the connector tries to hint the proper worker for KeyValue operations (again, transparently). That way expensive network shuffle operations are reduced, leading to even better performance under such setups. Note that this is a pure optimization, you can run any topology you like and it will just work.

Spark SQL – A N1QL Lovestory

Spark SQL is a module for working with structured data. It allows the user to put a schema over an RDD, which is then called a DataFrame (previously SchemaRDD). Because Spark now has structure information of the data it is working with, it can apply all kinds of transformations and optimizations.

Couchbase Server 4.0 includes the brand new N1QL query language, which blends perfectly into the Spark SQL APIs. There is only one gotcha: documents stored in Couchbase are not required to adhere to a specific schema – that's one of its features. So how do we bring structure in a schemaless world?

The answer to that is automatic schema inference. If you create a DataFrame on top of Couchbase, you need to provide a “schemaFilter” which in turn will internally create a predicate. Then we will load lots of documents with that predicate and infer the schema from there. The following example shows how to create a DataFrame for airlines in the “travel-sample” bucket, which are identified by their type attribute in the document itself:

This prints:

If your documents are more or less similar, this approach works well. If your documents are completely schemaless so that every document looks very different, you can also provide the schema manually. This way, you specify only the fields you potentially need:

Finally, if this still doesn't work you can allways fall back to an RDD query and generate a DataFrame from the results:

This prints:

You can see how it even detects the recursive structure of JSON objects and arrays. This can be utilized as well at query time, giving you flexibility in both data modeling and querying.

Now that you have your DataFrame created, you can perform all kinds of queries against it:

This prints:

Here is a different example which shows how you can create a DataFrame from HDFS and join it with Couchbase rows:

One important piece of this is handled under the covers as well: the required fields and predicates are pushed down to the N1QL query engine on the server, so we only compute and transfer essential data, allowing for more efficient networking and CPU resource handling.

Spark Streaming – In-N-Out in (soft) Realtime

Spark Streaming brings a microbatch streaming approach to Spark, allowing you to perform both batching and streaming applications in one system. Couchbase allows you to persist such streams into Couchbase as well as (exerpimentally) creating such a stream through its internal document change protocol (DCP).

Persisting a DStream works the same way than persisting an RDD – you just need to use the right implicit import and convert it into a Document representation. The following examples shows you how to persist the content of tweets in a twitter feed into couchbase:

You can find more information about Spark Streaming support here.

The Road Ahead

Getting this first stable release out of the door was important. The next release (1.1) will bring official compatibility with Spark 1.5, as well as other enhancements and stability fixes. As always, please try out the connector and provide feedback on what you think we should improve.

Happy hacking, no bugs and quick shuffle operations!

Posted by Michael Nitschinger

Michael Nitschinger works as a Senior Software Engineer at Couchbase. He is the architect and maintainer of the Couchbase Java SDK, one of the first completely reactive database drivers on the JVM. He also authored and maintains the Couchbase Spark Connector. Michael is active in the open source community, a contributor to various other projects like RxJava and Netty.

One Comment

  1. Hi.

    How would this code look in databricks because if you run currently there is an error: developers should utilize the shared SparkContext instead of creating one using the constructor. In Scala and Python notebooks, the shared context can be accessed as sc. When running a job, you can access the shared context by calling SparkContext.getOrCreate()

    Code I am refering to:

    // Generate The Generic Spark Context
    val sc = new SparkContext(new SparkConf().setAppName(\”example\”)
    .setMaster(\”local[*]\”)
    .set(\”com.couchbase.bucket.travel-sample\”, \”\”))

    // Setup Spark SQL
    val sql = new SQLContext(sc)

    // Create a DataFrame with Schema Inference
    val airlines = sql.read.couchbase(schemaFilter = EqualTo(\”type\”, \”airline\”))

    // Perform the query
    airlines
    .select(\”name\”, \”iata\”, \”icao\”)
    .sort(airlines(\”name\”).asc)
    .limit(5)
    .show()

    Thanks,

    Mark

Leave a reply