RxJava is an awesome way to do reactive programming in Java. In case you've never used or even heard of RxJava, it is a a way of programming with asynchronous data streams. That said, it is event based programming where you can observe or listen for stream data and do something when said stream is discovered. Data streams can be anything ranging from variables to data structures.

For this example, lets say a line of CSV data is a data stream. Many developers find themselves needing to load CSV data into their database more often than they expect. In most scenarios you cannot just read lines from a CSV file and drop it into a database. In this example we're going to see how to subscribe to a stream of data using RxJava, transform it, and then save it into Couchbase.

The Requirements

There are not too many requirements to get this project up and running. At a mimimum you'll need the following:

All of the development and work will happen with the JDK 1.8 and Maven, and this includes running the application.

Understanding the Dataset and Data Model

A great way to get your feet wet when it comes to RxJava is to get a sample dataset to play around with. For simplicity we'll invent our own comma separated value (CSV) file, but if you wanted something more extravagant, you can visit the data science website, Kaggle.

Let's assume that our simple CSV dataset has the following columns per row:

  1. id
  2. first_name
  3. last_name
  4. twitter

From a query and analysis perspective, working with the data in CSV format is near impossible. Instead, this data is going to be stored as NoSQL data so it can be later processed. We won't get into the number crunching and querying here, but it will come in a future article. Right now we just want to get it into NoSQL format.

When loaded into Couchbase, each row of the CSV will look something like the following:

Yes, the above chunk of data is a JSON document, which is what Couchbase supports. Now that we know the data goals, we can begin loading the CSV data into Couchbase with RxJava.

Transforming the Raw Data and Writing to Couchbase

To use RxJava to load CSV data via a Java application, a few dependencies must be included. We need to include RxJava, OpenCSV, and the Couchbase Java SDK. Since we're using Maven, all can be included via the Maven pom.xml file. To include RxJava, include the following dependency in your Maven file:

Since the raw data will be in the form of CSV, we can use the open source CSV library for Java called OpenCSV. The Maven dependency for OpenCSV can be added like this:

Finally, Java needs to be connected to Couchbase Server. This can be done through the Couchbase Java SDK. To add this dependency into your Maven project, add the following to your pom.xml file:

All the project dependencies are good to go!

To load, but not read, the CSV file, you'll create a new CSVReader object as follows:

Since this data will eventually be processed into Couchbase, we must connect to our server and open our bucket.

The above assumes that Couchbase is running locally and the data will be saved in the default bucket without a password.

To process the CSV dataset, an RxJava Observable can be created:

To break down what is happening in the Observable, the following steps occur.

The CSVReader creates an Iterable. The Observable will use the Iterable as the source of data using the .from method.

The data read will be an array of strings, not something that can be stored directly in the database. Using the .map function, the array of strings can be transformed into whatever we decide. In this case, the goal is to map each line of the CSV to a Couchbase document. During this mapping process we can do further data cleanup. For example we could do something like csvRow[*].trim() to strip out any leading and trailing whitespace in each CSV column.

Finally with each read line processed, it must be saved to Couchbase. The .subscribe method will subscribe to notifications that the Observable emits, in this case the manipulated data.

Conclusion

You just got a taste of loading dirty CSV data into Couchbase by using RxJava and the Couchbase Java SDK. By doing reactive programming you can take actions on any data stream that you're observing. In our scenario we just wanted to load a CSV file into Couchbase.

If you've got a massive dataset, you can take this tutorial to the next level by using Apache Spark. I wrote a very similar CSV loader found here that makes use of Spark.

Posted by Nic Raboy, Developer Advocate, Couchbase

Nic Raboy is an advocate of modern web and mobile development technologies. He has experience in Java, JavaScript, Golang and a variety of frameworks such as Angular, NativeScript, and Apache Cordova. Nic writes about his development experiences related to making web and mobile development easier to understand.

Leave a reply