It’s been a while since we’ve introduced the first developer preview of our brand new Couchbase Spark Connector. So we thought it’s time for another release, providing enhancements and a handful of new features. In particular:

  1. Native Support for Spark SQL
  2. Native Support for Spark Streaming
  3. Preferred Locations for Key/Value Access
  4. Dependency Upgrades

Most importantly, we now use Apache Spark 1.3, which will be the target version for the GA release of the connector.

If you want to dive in right now, here are the coordinates:

  1. Group ID: com.couchbase.client
  2. Artifact ID: spark-connector_2.10 or spark-connector_2.11
  3. Version: 1.0.0-dp2

Make sure to include the Couchbase Repository since it is a pre-release:

In addition to the plain maven dependency, we are now also available on spark-packages.org!

Spark SQL Support

The main reason for upgrading to Spark 1.3 is the support for the new DataFrame API. The new DataFrame API is based on SQL and allows us to tightly integrate couchbase’s language specific implementation, N1QL. This provides a seamless end to end experience thats both familiar and tightly integrated.

The connector allows you to define relations (Couchbase itself does not enforce a schema) and you can either provide a schema manually or you can let the connector infer it for you. If you want automatic schema inference, it is a good idea to provide some kind of filter (for example on a type field) to get better accuracy. It will transparently load sample documents and create a schema for you which you can then query.

Here is an example with the current travel-sample:

This will first print you a schema like:

And then the results in the format of:

While this is great to work with, the real power comes when you want to combine DataFrames from different sources. The following example fetches data from HDFS and Couchbase and queries it together:

Note that currently nested JSON objects and arrays are not supported, but will be in the next version. They can show up in the schema, but will fail during query time (when you want to query one of those fields or include in the result).

Spark Streaming Support

The first developer preview already provided support for writing data from a Spark Stream into Couchbase. In this developer preview, we’ve added experimental support for utilizing our “DCP” protocol to feed mutations and snapshot information as quickly as possible into spark. You can use this for near real-time analysis of data as it arrives in your Couchbase Cluster.

Note that currently snapshot and rollback support are not implemented, so failover and rebalance will not work as expected.

Here is a simple example on how to feed changes from your bucket:

That’s all you need to do at this point. Make sure to test it on an empty bucket, because currently it will provide you all data in the bucket as well. Capabilities to start at the current snapshot will be added in a future release. If you run the code and then write documents into the bucket Spark Streaming will notify you of those changes:

You can then filter for mutations and perform any kind of stream operation on them, including feeding them into a machine learning algorithm or storing in a different datastore after processing (or even back into couchbase).

Preferred Locations for Key/Value Access

One of performance killers when using Spark are shuffle operations. A “shuffle” is an operation which involves transferring data from one worker to another over the network. Since network traffic is always orders of magnitude slower than in-memory processing, shuffling needs to be avoided as much as possible.

When you are loading documents through their unique IDs, the connector now always (and transparently) hints to Spark where the document is located in Couchbase. As a result, if you deploy a Spark Worker on every Couchbase Node, Spark will be intelligent enough to dispatch the task directly to this worker, removing the need to transfer the document over the network and allowing for local processing.

Since the location is just a hint, you can still run your workers on different nodes and Spark will dispatch them properly if it can’t find a perfect match.

In the future, we also plan preferred locations for N1QL and View lookups based on the target node where the query will be executed.

Planned Features And Next Steps

While this release brings us closer to completion, we still have many things to complete. The next milestone will be a beta release and will include features like the java API, enhancements to Spark SQL and Streaming support and of course fixes and stability improvements as we find them.

Since we haven’t reached beta yet, please provide as much feedback as possible while we can change the API as needed.

Author

Posted by Michael Nitschinger

Michael Nitschinger works as a Principal Software Engineer at Couchbase. He is the architect and maintainer of the Couchbase Java SDK, one of the first completely reactive database drivers on the JVM. He also authored and maintains the Couchbase Spark Connector. Michael is active in the open source community, a contributor to various other projects like RxJava and Netty.

11 Comments

  1. Is there a java port for this connector available?

    1. there is a java package \”japi\” that provides java-oriented wrappers: https://github.com/couchbasela

  2. about create index :

    create index testResult_E on testResult(Test12._asda) using gsi;
    {
    \”requestID\”: \”7c7ddd6c-55c5-4aec-9c61-d805c350fdfe\”,
    \”signature\”: null,
    \”results\”: [
    ],
    \”status\”: \”success\”,
    \”metrics\”: {
    \”elapsedTime\”: \”4.970636925s\”,
    \”executionTime\”: \”4.969727845s\”,
    \”resultCount\”: 0,
    \”resultSize\”: 0
    }
    }

    In fact, the _asda key in the jsonDoc \’Test12\’ doesnot exist.
    why does it alert the error info?

    1. create index doesn\’t index a particular document… here what you did instruct is \”in the bucket testResult, if it exist index the JSON field _asda inside the JSON field Test12\”. creating an index for a single document makes no sense, just use \”WHERE META(bucketName).id = documentId\”.

      in short, syntax is \”CREATE INDEX indexName ON bucketName(oneFieldInJSON, orMore, or.even.path.in.JSON)\”.

      you can even put a where clause at the end, eg. if one of your fields is a category, index only a specific category: CREATE INDEX topProductsByPrice ON products(price) WHERE category = \”bestseller\”.

      also please prefer the forums (http://forums.couchbase.com) for interactions with the team and technical questions, it is far more closely watched and easy to follow for us ;)

      1. hi,
        In couchbase sdk , is there uuid-generator api?

        1. no there isn\’t

          1. Thank you!
            where can i use the uuid seeing from guide?

            Can you tell me what js-engine the CS utilize?

          2. hi,
            how can i find out the path of js-lib used by CS?
            And how can i add sef-defined js-lib to CS?

          3. please use the forums (http://forums.couchbase.com) for interactions with the team and technical questions, it is far more closely watched, easier to work with for everyone involved and you can post more detailed questions there.

          4. sorry, i now cannot access the website. Because the httpresponse is always blank. Can you tell others\’ disqus who can help me?

          5. hi,
            Does CS only own 10 buckets?

Leave a reply