You’re probably aware of the new Cross Data Center Replication (XDCR) feature of Couchbase Server 2.0.  Its most obvious utility is to allow you to replicate data from one Couchbase cluster to another.  However, there are more novel use cases for XDCR.  One of the more notable examples is Couchbase and Elastic Search integration.

Performing XDCR against a non-Couchbase cluster is possible because the communication endpoint is implemented using a simple RESTful API.  By implementing a few simple methods, you can setup your own replication endpoint.  There are a variety of reasons you might want to build your own XDCR endpoint.  One simple example is to receive change notifications.

Setting up an endpoint is pretty straightforward and may be done using any web framework suitable for building RESTful APIs.  In .NET, one option is ASP.NET MVC and the new Web API.  While a good option, a simpler solution would be to use the Sinatra inspired Nancy.

Nancy is a lightweight, microframework for building HTTP based applications with .NET.  In the most basic case, you create a module that extends from NancyModule and create a route and handler.

public class SampleModule : Nancy.NancyModule
{
public SampleModule()
{
Get[“/”] = _ => “Hello World!”;
}
}

To demonstrate how to create an XDCR endpoint using Nancy, I’ve published a Couchbase Labs project on GitHub named couchbase-xdcr-nancy.  This project is based heavily on Jasdeep’s couchbase-xdcr-sinatra code.

Inside the Visual Studio project, you’ll find a number of plumbing files.  I won’t go over the code in detail here, but will show a couple of highlights.  The most important code is inside of the XdcrModule class.  It’s there where you’ll find the handlers for the various resource requests made by the XDCR service.

Within the constructor for this module, you’ll find a handfull of GET handlers that handle requests to URIs starting with /pools.  These are effectively handshake URIs used by XDCR to discover information about the cluster and its buckets (similar to the endpoints used by Couchbase SDKs to bootstrap client instances).  Note that in this sample, the bucket is hardcoded to “default.”  You could easily modify that value or make it a configuration setting.

Get[“/pools”] = x =>
{
var output = new
{
pools = new object[]
{
new { name = “default”, uri = “/pools/default?uuid=” + UUID_POOL }
},
uuid = UUID_POOL
};

return Response.AsJson(output);
};

After the GET handlers, there are a couple of POST handlers that respond to the actual XDCR feed.  The XDCR service will pass parameters as part of the path.  Regex based routes are used for this purpose.

The first POST handler receives a list of documents and their revisions from the XDCR service.

Post[REGEX_REVS_DIFF] = x =>
{
var body = “”;
Context.Request.Body.Position = 0;
using (var sr = new StreamReader(Context.Request.Body))
{
body = sr.ReadToEnd();
}
var jobj = JObject.Parse(body);

var outDict = new Dictionary<string, object=“”>();
foreach (var item in jobj)
{
var key = item.Key;
var rev = item.Value.ToString();
if (handler.IsMissing(key, rev))
{
outDict[key] = new { missing = rev };
}
}

return Response.AsJson(outDict);
};string,>

Within this handler, the JSON is parsed to discover the key and revision.  If that combination is not found to be replicated, then it is added to the list of “missing” keys included in the response to the request.  XDCR will then send those documents to the another POST handler, where they’ll be created.

Post[REGEX_BULK_DOCS] = x =>
{
var body = “”;
Context.Request.Body.Position = 0;
using (var sr = new StreamReader(Context.Request.Body))
{
body = sr.ReadToEnd();
}
var jobj = JObject.Parse(body);

var newEdits = jobj.Value<bool>(“new_edits”);
var docs = jobj.Value<JArray>(“docs”);
foreach (var doc in docs)
{
var originalDoc = Encoding.UTF8.GetString(Convert.FromBase64String(doc.Value<string>(“base64”)));
var meta = doc[“meta”] as JObject;

var document = new Document
{
Id = meta.Value<string>(“id”),
Revision = meta.Value<string>(“rev”),
Expiration = meta.Value<int>(“expiration”),
Flags = meta.Value<int>(“flags”),
Value = originalDoc
};

handler.CreateDocument(document);
}

return HttpStatusCode.Created;
};

Within the sample, I’ve included a very simple interface used to create a pluggable handler that will check the existence of the document and create it when necessary.

public interface IReplicationHandler
{
bool IsMissing(string key, string rev);

void CreateDocument(Document document);
}

For demonstration purposes, I’ve also included an XmlReplicator class that implements this interface.  This class uses LINQ to XML to update and query an XML document that contains documents fed to it by the XDCR service.

public class XmlReplicator : IReplicationHandler
{
private readonly string _path;
private XDocument doc = new XDocument();

public XmlReplicator(string path = @”C:tempreplication.xml”)
{
_path = path;
if (! File.Exists(_path))
{
//var xml = new XElement(“documents”);
File.WriteAllText(path,, Encoding.UTF8);
}
}


}

To check if the document exists, IsMissing will query the XML for a document element that has child id and rev elements matching the supplied key and revision.

public bool IsMissing(string key, string rev)
{
var xml = XDocument.Load(_path);
var documents = xml.Document.Root.Elements(“document”);
var document = documents.Where(d => d.Element(“meta”).Element(“rev”).Value == rev && d.Element(“meta”).Element(“id”).Value == key);
return document.Count() == 0;
}

Documents are created simply by adding them to the existing XML file.

public void CreateDocument(Document document)
{
var xml = XDocument.Load(_path);

var docElement = new XElement(“document”,
new XElement(“meta”,
new XElement(“id”, document.Id),
new XElement(“rev”, document.Revision),
new XElement(“expiration”, document.Expiration),
new XElement(“flags”, document.Flags)
),
new XElement(“value”, new XCData(document.Value))
);

xml.Document.Root.Add(docElement);
xml.Save(_path);
}

By default, I’ve set up Nancy to use the ASP.NET development server.  You can host Nancy however you wish (IIS, self hosted, etc.).  Once you have the project running, you’ll need to setup XDCR in the Couchbase admin console.  Some values to know:

  • Nancy port – 8675 (you can modify this in the project settings)
  • Nancy basic auth: Administrator:qwerty (modify this in XdcrUserValidator)

Finally, to plugin your own IReplicationHandler instance, simply register your type in the ApplicationBootstrapper (or delete the other implementations).

container.Register(new XmlReplicator());

Remember that CouchbaseLabs projects aren’t fully supported, but feel free to post questions to the forums.

Author

Posted by John Zablocki, NET. SDK Developer, Couchbase

John Zablocki is a NET. SDK Developer at Couchbase. John is also the organizer of Beantown ALT.NET and a former adjunct at Fairfield University. You can also check out the book on Amazon named "Couchbase Essentials" which explains how to install and configure Couchbase Server.

One Comment

  1. Hi, this implementation can solve a huge use case in my project, what a wonderful job. Just one thing, after try an error I think I have reach a good advance but the replication is giving me one error. I create de cluster, I create the replication, but when the system tries to replicate I get this error: Attention – 2017-04-09 03:30:47 127.0.0.1:ToplogyChangeDetector:missing vbopaque in _pre_replicate response. status_code=400 respMap=map[] vbno=400

    Please I think I am really close, can you help me to understand what went wrong ? Thank you very much

Leave a reply