I recently saw a question on our forums where someone wanted to move some data from CouchDB to Couchbase. Since I usually help friends in need when they are moving, I thought I would help. Moving requires preparation, especially with large furniture like Couch 😀

Sorry about that metaphor, now let me explain how I did it. My goal is to move data from CouchDB to Couchbase. So the first question is how to get the data out of CouchDB? There are several options available here and the most straightforward for me was to use the REST API. If you use the _all_docs endpoint with the include_doc parameter set to true, you will get every document. This is exactly what I need.

Now instead of downloading the REST response directly, I can use the Java 8 stream API. And since I will be using RxJava as it’s part of our SDK, I need to wrap that stream in an observable. It’s actually quite simple:

 

 

 

With this I have an observable that emits CouchDB documents. Next step is of course to take these documents and send them to Couchbase. I will be using a flatMap first to parse each line of the response. It’s easy to do because each line of the response contains one doc as you can see:

 

I have to handle the first and last line separately. In the first line I get the total_rows and the offset information. I don’t have to do anything at the last line. I simply return Observable.empty() for those two lines as I don’t have anything to feed to the next operator. All the other lines contain a row edited by CouchDB. Each of these rows contains a JSON document we can wrap in a JsonNode.

The next operator is also a flatMap. Here I extract the key of the document and its content as a String. Since I have a Json object as a String, I don’t need any kind of mapping using Jackson or the like. I can directly use a RawJsonDocument. Once I have the RawJsonDocument I can import it into Couchbase. To do so I use the upsert method. It’s a bit like a “no questions asked” thing. We don’t care if the document exists or not: if it doesn’t exist then it will be created, whereas if it does exist then it will be replaced. This might not be the behaviour you want but it’s the simplest for this scenario as it means I don’t have to handle errors when a key already exists.

I assign a 500 millisecond timeout to the operation as it really should not take long. Then I use the RetryBuilder. It’s a nice helper added by Simon Basle to easily manage retry on error. Here I retry up to 100 times if I get a RequestCancelledException. I add an arbitrary delay of 31 seconds before each retry. I do the same for the TemporaryFailureException and BackpressureException. Here I use a delay of a 100 milliseconds instead.

Then I use doOnError and doOnNext to log the key of the document in either the success log file or the error log file. doOnX methods don’t change the core semantics of your stream (like transforming data does, for instance) but rather add some behavior on the side, “side effects”. In there I write a String to a log file using FileWriter. This is unfortunately synchronous and blocking. I might change this to use an async logger instead.

I then use onErrorResumeNext to make sure the import continues even if there are errors. Finally I use count().toBlocking().single() to know how many insert I did in Couchbase. I compare the result to the number of total_rows at the end.

In the end the code looks like this:

There are still a lot of enhancements possible. Like having a configuration option to choose the level of consistency you want during the import (PersistTo and ReplicateTo options). It would also be nice to give the error log containing only the keys of the documents that were not imported as entry to the script. This way you can replay an import of only the docs in error.

Anyway hope it helps, feedback is welcome!

Posted by Laurent Doguin, Developer Advocate, Couchbase

Laurent is a Paris based Developer Advocate where he focuses on helping Java developers and the French community. He writes code in Java and blog posts in Markdown. Prior to joining Couchbase he was Nuxeo’s community liaison where he devoted his time and expertise to helping the entire Nuxeo Community become more active and efficient.

Leave a reply