Introduction

Reactive programming and RxJava have become pretty hot topics, especially over the last couple of years. I got a taste of RxJava while trying it out in an Android project a little while ago. I had a simple threading problem, which I could have solved easily enough other ways. Since I had been reading about RxJava, though, I decided to try it out. I was immediately impressed with how much simpler and understandable the code was.

Despite that success, RxJava has a bit of a reputation for being difficult to learn. In a more recent project, I wanted to handle live updates from a database. The database (Couchbase Lite), has a callback-based system for monitoring changes. I wanted to wrap that callback into a reactive structure. (This could have been an Observable or a Flowable. Look for a follow-on article to talk about choosing between them.)

The first thing I discovered was, I couldn’t find a good example of a general version of what I wanted. There’s a simple example in the RxJava documentation, but it has some drawbacks I wanted to avoid. For instance, in the example, the Event object is assumed to carry a method to determine if a given event is the last in the stream. A lot of callbacks in Android don’t have such a method.

Although I did later discover a Stack Overflow post that covers the basics pretty well, I wanted to understand more.

A full discussion of the internals could, not surprisingly, fill a book. In this post I’ll just cover the core. There’s code with several experiments to help understand details. That’s too much for one write-up, so we’ll save that for another time.

Objective

To be more explicit, we will look at taking a listener callback interface, common in event-driven programming, and wrapping into an Observable.

That is, how do we go from

to

Android’s OnClickListener is one example of this type of API. OnClickListener is an interface with one method, onClick. It has an Android View as a parameter. The Android system uses this to deliver streams of events like button presses, and so on.

Getting Started

The source for this work can be found on GitHub here.

We’ll only look at a small part of that code in this post. Other parts of the code are designed to try out various experiments. Those may be something for future articles. For this, we’ll just focus on the central topic.

To follow along, clone the repo. The code is configured to be built by gradle, so you can run it from the command line or import it into your favorite environment.

Creating a Source

The object is to convert an API for listening to some kind of source of events. The first thing we need is an actual event stream to test with. There are a few sources built to experiment, so we’ll start with a base class.

Listing: BasicSource.java

This defines the interface, and has the common field and setter method needed by all implementations.

Next, derive a source that mimics an unbounded stream of events as follows.

Listing: UnboundSource.java

This version generates new items using a Supplier function, passed in to the constructor. This just shows there’s nothing special about the actual objects fed out, since the supplier could create anything.

We have a method to explicitly start creating items. Here we use an infinite loop to generate them indefinitely.

The assignment of listener to current works around a race condition where disposing the subscription can happen between the check for null and the actual invocation of the onItem callback.

Converting to an Observable

Great, so now we have a source that mimics, say, an open-ended series of button clicks. Next, let’s create a custom Observable.

We’ll use the method recommended in the RxJava documentation. This uses the Observable.create method, rather than subclassing Observable directly. (The project includes code to do the latter, too, for comparison.)

Take a look at the listing.

Listing: Observables.java

First, we create an instance of our UnboundSource.

Next, we create an instance of ObservableOnSubscribe using a lambda expression. The method we’re overriding, subscribe, has one parameter, an Emitter object.

This provides the connection between our source’s listener callback and a subscriber, through a second lambda expression. This second expression just checks emitter.isDisposed to make sure the subscription is still active, then pushes an item downstream by calling emitter.onNext. That’s the key line this has been building up to.

Having wired up our original callback, we want to provide a way to stop the flow. We use a Cancellable here for simplicity. The lambda expression breaks the flow by nulling out the listener callback.

Disposable would also work. This Stack Overflow answer gives a good idea of the difference between them, and how to choose which to use.

Disposables are the RxJava 2 solution to unsubscribing from a stream. This post by Kaushik Gopal explains some of the reasoning around the use of disposables in general.

With everything interconnected, we fire up the source to start generating events.

Instantiation and Subscription

With our ObservableOnSubscribe instance in hand, we can now create our Observable instance with the call to create.

The Observable class provides a fluent interface with quite a few methods available. We subscribe to the resulting observable using a method that breaks out the ‘onXXX’ functions into individual pieces.

Output

If you run the example as shown, you should get something like the following output.

Everything is happening in series on the main thread. Not super interesting, for all that effort. We haven’t leveraged the full power of RxJava yet. For those familiar with RxJava, you’ll know how easy it is to make the code run asynchronously. That’s something interesting with quirks I didn’t expect. Again, something to explore in another post.

Commentary: Learning RxJava

When I originally decided to write this post, I wanted to talk about the things I learned while studying some of the internals of RxJava. In the end, it became far too much for one article.

Part of what makes RxJava challenging is the shear number of APIs, but even the basics involve a web of intertwined interfaces.

For example, there’s only one version of Observable.create. It takes an ObservableOnSubscribe instance as an argument, as we’ve seen. Simple enough.

But then we dig down a bit. ObservableOnSubscribe is an interface with only one method, subscribe. Again, simple, yet this begins to reveal part of what makes understanding RxJava tricky.

Going further, we see ObservableOnSubscribe.subscribe supplies an ObservableEmitter as an argument. An ObservableEmitter extends an Emitter, and adds some methods for managing a Disposable. An Emitter, it turns out, has almost the same interface as an Observable. It only lacks the onSubscribe method.

There are mostly interface definitions. We haven’t touched on the implementations, and I’m only scratching the surface here.

The code is fascinating. Explore more, and you’ll get into how operators can modify what’s going on in a whole chain of calls, how RxJava does buffering, how some operations signal how many items to pass on, and more.

I’m looking forward to exploring further and passing on what I learn.

Postscript

Couchbase is open source and free to try out.
Get started with sample code, example queries, tutorials, and more.
Find more resources on our developer portal.
Follow us on Twitter @CouchbaseDev.
You can post questions on our forums.
We actively participate on Stack Overflow.
Hit me up on Twitter with any questions, comments, topics you’d like to see, etc. @HodGreeley

ReactiveX logo used courtesy of the ReactiveX projects under terms of the Creative Commons Attribution 3.0 License.

Posted by Hod Greeley, Developer Advocate, Couchbase

Hod Greeley is a Developer Advocate for Couchbase, living in Silicon Valley. He has over two decades of experience as a software engineer and engineering manager. He has worked in a variety of software fields, including computational physics and chemistry, computer and network security, finance, and mobile. Prior to joining Couchbase in 2016, Hod led developer relations for mobile at Samsung. Hod holds a Ph.D. in chemical physics from Columbia University.

Leave a reply