How to Integrate MongoDB with Diffusion Cloud

MongoDB forms part of the NoSQL vanguard and is big in more than once sense, which is part of the attraction. It is both very popular, finding employ inside eBay, LinkedIn and Craigslist, and it excels in storing & organizing very large volumes of data.

That it stores its content as (what look and feel like) JSON documents makes it interesting from from a Diffusion Cloud perspective. “How hard ..”, I asked myself, “..would it be to reflect a MongoDB collection into the topic tree?”. Not so hard, as it turns out.

The intent

MongoDB divides is storage into databases which in turn are divided into collections, much like a conventional RDMS uses tables. Our intent is to subscribe to a single collection and all changes to it, creating a discrete JSON topic for each document, and updating each topic in sync with the original document.

Configuring MongoDB for replication

Our first obstacle: MongoDB doesn’t really do subscription in a pub/sub sense. So instead make use the replication feature, which is really intended for wiring together MongoDB replica sets. Plenty of third-party applications make use of it however, including Meteor who do include a more directly useful Publish/Subscribe feature in their product.

It’s an optional feature so it has to be deliberately configured. Typically MongoDB is run as a daemon, but for testing purposes we configure and run ours in the fore.

https://gist.github.com/martin-cowie/9ab5fa34b9729ba0e77c421122df32ba

As well as starting your MongoDB server this will also output a lot of diagnostic noise, so start another shell for MongoDB interaction.

https://gist.github.com/martin-cowie/2e95106e46ab48d6d3821f0dd8a15553

The output from this Mongo testReplSet:PRIMARY confirms that we’re logged into the primary (and indeed only) member of the replication set. We then create 100 documents in a collection someDB.someCollection. This gives us our test subject.

Building the Adapter.

All sources for the adapter are available in Github, but here we shall lavish some explanation on the significant parts.

Placing the connectivity

https://gist.github.com/martin-cowie/8920f8fbe953af7ca1f572ae79268473

We chose JCommander because we’ve used it before, and while http://mvnrepository.com/ shows nearly five times as many projects use Apache Commons CLI, the annotation based approach makes for a quicker & cleaner approach.

Having connected to MongoDB using Mongo’s own JavaScript driver we then arrange access to two collections; firstly the collection given on the command line and secondly oplog.rs in the database local which exists only when replication is configured, and is used as a sink for all storage mutation events. The OpLog is a “capped” collection, making it a little different to regular collections:

  • Documents can only be appended to a capped collection (though an identically sized documents can be used to overwrite an existing document).
  • Capped collections are akin to circular buffers. They are bounded, and once all allocated storage is used, older storage is then reused.
  • Similarly to the way tail(1) works with files, a capped collection can entertain queries from ‘tailable’ cursors, such that when the cursor reaches the end of the collection the cursors waits while more documents are appended to the collection.

Building the Adapter

https://gist.github.com/martin-cowie/3a70fed41e11be03e33b5f812447be57

Adapter’s static build method gathers two key features. Firstly it obtains TopicUpdateControl which is used to update topics values, using which it obtains an exclusive ValueUpdater for a branch of the topic tree. Finally we obtain TopicControl in order that we may create and remove topics.

Now we build a MongoNamespace, encapsulating the database and collection used to listen for command events (specifically commands to drop the collection), before constructing and returning an Adapter object. Using a builder rather than a more complex constructor conveys many benefits (as covered in chapter 2 of in Effective Java by Josh Bloch), but my motivations here are: the constructors intent is clear and it is simpler to build an immutable (and therefore thread-safe) Adapter object.

Turning the handle

https://gist.github.com/martin-cowie/8b0afe16be26a3b22a7ae901e84e91b7

The real work can begin now both the connectivity and state are in place. In method run() we do two things:

  1. Query the MongoDB collection for all documents, transcribing each.
  2. call relayChanges()

We use Java 8’s “Try With Resources” feature so that cursor opened on line 10 is closed a soon as the thread drops out of that scope.

Method transcribeDocument() has the task of building a new JSON topic from the content of the MongoDB document. If the topic has already been created (and therefore exists in topicPaths) we update the topic. Either branch makes use of method toBytes(String) which parses a JSON string down to a form suitable for the ValueUpdater.

Relaying the changes

Method relayChanges(long) does what it says on the tin: relays changes from the MongoDB instance to the topic tree. Before listening to the OpLog for events it composes a filter to use. In this case we wish only to see events later that ‘now’, that are document-insert, update or delete events, or commands to drop the collection.

Interestingly BsonTimestamp resolves time only to the second, and nothing smaller. It uses an incrementing counter to disambiguate events within that second.

We make the call to oplog.find(filter), and build a TailableAwait cursor, and then read incoming events as they happen. We switching them to either processInsert(Document), processUpdate(Document), processDelete(Document) or processCommand(Document), depending on the value of property op (presumably short for operation).

Methods processUpdate(Document), processDelete(Document) and processCommand(Document) check that the event relates to the adapted collection. A more focussed filter here would be both more performant and eliminate the need for this step.

Method processInsert(Document) mostly reuses transcribeDocument() to update a topic with the new content embedded in the event.

Method processDelete(Document) uses the MongoDB document-id to look up the effected topic, and then remove it.

Method processUpdate(Document) is more interesting as the event contains instructions to change the document, but not the changed document itself. A more fully realised adapter implementation would interpret and react to the changes, however in this case we fetch the changed document from the collection using it’s ObjectId, and pass that to transcribeDocument to update the existing topic. This imposes the cost of an extra round-trip, for the sake of readability.

Summing up

Above we have shown how little code is actually required to get started. As this is written the Github repository holds only 426 lines of Java. In the next blog we shall show how the adapter can be put to use.


Further reading

The Diffusion Data logo

BLOG

Creating a WebSocket Server for PubSub

June 28, 2024

Read More about Creating a WebSocket Server for PubSub/span>

The Diffusion Data logo

BLOG

Benchmarking and scaling subscribers

March 15, 2024

Read More about Benchmarking and scaling subscribers/span>

The Diffusion Data logo

BLOG

100 million updates per second - Landmark Diffusion cluster performance

July 02, 2024

Read More about 100 million updates per second - Landmark Diffusion cluster performance/span>