This blog post explains our reasoning and motivation behind choosing RxJava as one of the integral components in our new Java SDK.

Motivation

There are many ways to design an API and every one has its own set of benefits (and drawbacks). In the process of designing our brand new APIs one of the main questions was how do we expose it to the user. 

One question we didn't have to ask ourselves was: should it be synchronous or asynchronous? We strongly believe that asynchronous APIs are the only sane way to get the performance and scalability you very often need, and also its is much easier to go from async to sync than the other way round. The current stable SDK (1.4.3 at the time of writing) already makes heavy use of Futures in various ways to provide async responses and this dates back into 2006/7 where spymemcached originally introduced the concept into its API.

It is well known that the Java Future interface is very limited compared to other solutions (like the Scala futures). In addition, it is also a bit tricker to code if you need to build async dataflows where one computation depends on the other and you want to have the whole thing async. In recent versions we added support for listeners, which improve the situation quite a bit but still are not an ideal solution.

Over the last few years, other libraries and patterns emerged which we followed closely. One of the mature concepts is known as Reactive Extensions, originating out of Microsoft and .NET. It is based around the idea that applications should be event-oriented and react to those events in a asynchronous ways. It defines a very rich set of operators on what you can do with the data (modify, combine, filter it and so on). Recenly, Netflix ported it over to Java and nicknamed it RxJava (note that while the project currently lives under the Netflix namespace, it will be moved to “io.reactivex” sooner than later). It is very stable and also provides adapters for other JVM languages like Scala, Groovy and JRuby which plays well with our plans to broaden support as well.

The Concept

The main idea of Rx revolves around Observables and its observers. If you haven't came across this concept, you can think of the Observable as the asynchronous and push-based cousin (or more formally called a dual) of an Iterable. More specifically, here is their relation:

Event Iterable (pull) Observable (push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete returns onCompleted()

Every time data gets pushed into an Observable, every observe that is subscribed to it receives the data in its onNext() method. If the observable is completed eventually (which doesn't have to be always the case). the onCompleted method is called. Now anywhere in the process, if an error occurs the onError method is called and the Observable is also considered to be complete.

If you like grammar, the contract looks like this: 

OnNext* (OnCompleted | OnError)?

Specifically note that there is no distinction if only 1 or N data is returned, this can be normally inferred from the methods that you call and how it is documented. It does not change your programming flow anyway. Since that is a little abstract, let's look at a concrete example. On the CouchbaseCluster class, there is a method called openBucket which initializes all needed resources and then returns a Bucket instance for you to work with. Now you can imagine opening sockets, grabing a config and so forth takes some time, so this is a perfect candidate. The blocking API would look like:

interface Cluster {
        Bucket openBucket(String name, String password);
}

How can we make it asynchronous? We need to wrap it into an Observable:

interface Cluster {
        Observable openBucket(String name, String password);
}

So we now return an observable which will eventually return with a bucket instance that we can use. Let's add a observer:

cluster.openBucket().subscribe(new Observer<Bucket>() {
    @Override
    public void onCompleted() {
        System.out.println(“Observable done!”);
    }

    @Override
    public void onError(Throwable e) {
        System.err.println(“Something happened”);
        e.printStackTrace();
    }

    @Override
    public void onNext(Bucket bucket) {
        System.out.println(“Received bucket: “ + bucket);
    }
});

Note that these methods are called on a different thread, so if you leave the code like this and quit your main thread afterwards, you probably won't see anything. While you could now write all the rest of your code in the onNext method, thats probably not the best way to do it. Since the bucket is something 
want to open upfront, you could block on it and then proceed with the rest of your code. Every Observable can be converted into a blocking observable, which feels like an Iterable:

BlockingObservable blockingObservable = cluster.openBucket().toBlocking();

You will find many methods to iterate over the received thata in a blocking fashion, but there are also shorthand methods if you only expect one single value (which we know is the case for us):

Bucket bucket = cluster.openBucket().toBlocking().single();

What happens here internally is that the value called in onNext is stored for us and returned once onComplete is called. if onError is called, the throwable is thrown directly and you can catch it.

Unifying APIs

Now what you've seen already barely touches the surface. The bucket opening could very well be handled also with a Future alone. Where Observables come to shine is when you need to work with more than one result returned. In this case, a Future doesn't fit the bill anymore and Future> or something similar does not have the same contract. Since Observables imply that more than one T can be returned, APIs can look the same even if sometimes one and sometimes more than one Ts are returned.

Again, let's look at a concrete example. The SDK exposes a get method which returns one document. It looks like this:

interface Bucket {
        Observable get(String id);
}

But we also support Querying (Views, N1QL) which potentially return more than one result (or even none). Thanks to the Observable contract, we can build an API like this:

interface Bucket {
        Observable query(ViewQuery query);
}

See? The contract implicitly says “if you pass in a query, you get N ViewResults back”, since you know how an Observable needs to behave. And for a bigger picture, here are even more methods that intuitevly behave the way you expect them to.

interface Bucket {
    <D extends Document> Observable<D> insert(D document);
    <D extends Document> Observable<D> upsert(D document);
    <D extends Document> Observable<D> replace(D document);

    Observable<ViewResult> query(ViewQuery query);
    Observable<QueryResult> query(Query query);
    Observable<QueryResult> query(String query);

    Observable<Boolean> flush();
}

Async my dataflow!

So far we have seen what Observables can do for us and how they help us with providing cohesive, simple and yet asynchronous APIs. But Observables really shine with their composability aspects. You can do lots of things with Observables, and we can't cover them all in this post. RxJava has very good reference documentation which can be found here, so check it out. It is using marble diagrams to show how async dataflows work, also something that we want to provide as part of our documentation in the future.

Let's consider a practical example: You want to load a document from couchbase (which is a full blown JSON object with user details), but you just want to do something with the firstname further down in your code. We can use the map function to map from the JsonDocument to the firstname String:

bucket
    .get(“user::1”)
    .map(new Func1<JsonDocument, String>() {
        @Override
        public String call(JsonDocument jsonDocument) {
            return jsonDocument.content().getString(“firstname”);
        }
    })
    .subscribe(new Action1<String>() {
        @Override
        public void call(String firstname) {
            System.out.println(firstname);
        }
    });

There are two important aspects here: Every method that is chained here is also executed asynchronously, so it is not blocking the originating thread. Once the get call against couchbase returns, we map the firstname from the JSON document and then finally we print it out. You do not need to provide a full blown Observer, if you are only interested in the onNext value you can just implement that one (as shown here). See the overloaded methods for more examples.

Also note that I'm deliberately showing Java 6/7 style anonymous classes here. We also support Java 8, but more on that later. Now, how could we extend this chain if we only want to print out the name if it starts with an “a”?

bucket
    .get(“user::1”)
    .map(new Func1<JsonDocument, String>() {
        @Override
        public String call(JsonDocument jsonDocument) {
            return jsonDocument.content().getString(“firstname”);
        }
    })
    .filter(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return s.startsWith(“a”);
        }
    })
    .subscribe(new Action1<String>() {
        @Override
        public void call(String firstname) {
            System.out.println(firstname);
        }
    });

Of course a simple if statement would suffice, but you can imagine your code to filter could be much more complex (and probably calling something else as well). As a final example on transforming observables, we are going to do something that comes very often: you load a document, modify its content and then save it back into couchbase:

bucket
    .get(“user::1”)
    .map(new Func1<JsonDocument, JsonDocument>() {
        @Override
        public JsonDocument call(JsonDocument original) {
            original.content().put(“firstname”, “SomethingElse”);
            return original;
        }
    })
    .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(JsonDocument modified) {
            return bucket.replace(modified);
        }
    }).subscribe();

FlatMap behaves very much like map, the difference is that it returns an observable itself, so its perfectly suited to map over asynchronous operations.

One other aspect is that with Observables, sophisticated error handling is right at your fingertips. Let's implement an example which applies a timeout of 2 seconds and if the call does not return hands back something else instead:

bucket
    .get(“user::1”)
    .timeout(2, TimeUnit.SECONDS)
    .onErrorReturn(new Func1<Throwable, JsonDocument>() {
        @Override
        public JsonDocument call(Throwable throwable) {
            return JsonDocument.create(“user::anonymous”, JsonObject.empty().put(“firstname”, “john-doe”));
        }
    });

Here a dummy document is returned (pretending some reasonable defaults for our example) if the get call does not return in 2 seconds. This is just a simple example, but you can do a lot with exceptions, like retrying, branching out to other observables and so forth. Please refer to the official documentation (and Rx's documentation) for how to use them properly.

Wait, there is more

There are much more features available, like combining (merging, zipping, concat) of different observables, batching the results up in time intervals, do side-effects and others. Once you get over the initial (small) hurdle of understanding the concept, it feels very natural and we promise you don't want to go back (if we are wrong, though, you can always block on an Observable or convert it into a future).

RxJava also has decent Java 8 support, so if you are a lucky one who is able to use it in their projects already you can simplify an example from above to this:

bucket
    .get(“user::1”)
    .map(jsonDocument -> jsonDocument.content().getString(“firstname”))
    .filter(s -> s.startsWith(“a”))
    .subscribe(System.out::println);

Neat, right? RxJava also provides different language adaptors on top of it, at the time of writing Scala, Clojure, Groovy, JRuby and Kotlin. They can be used to provide even more language-specific integration and we are also planning to use some of them to enhance couchbase support for each of those languages as we see demand. Our topmost priority aside from the java SDK is definitely Scala, so be on the lookout for some announcements sooner than later!

We hope that you are now as excited as we are and looking forward to your feedback and questions through the usual channels!

Author

Posted by Michael Nitschinger

Michael Nitschinger works as a Principal Software Engineer at Couchbase. He is the architect and maintainer of the Couchbase Java SDK, one of the first completely reactive database drivers on the JVM. He also authored and maintains the Couchbase Spark Connector. Michael is active in the open source community, a contributor to various other projects like RxJava and Netty.

2 Comments

  1. Really looking forward to an announcement related to Scala. I\’ve just had a look at http://reactivecouchbase.org/ but it currently depends on the 1.4 Java SDK. Is it worth waiting for your announcement before I start porting an application which is currently using mongodb and ReactiveMongo?

  2. […] de code asynchrone. Certains éditeurs de base de données l’ont bien compris : le driver de CouchBase utilise déjà des Observable dans son driver asynchrone. MongoDB, de son coté, a publié […]

Leave a reply