Building on Value Streams

In DiffusionTM 5.7 we introduced data types, value streams, Binary topics and JSON topics. Together they streamline much of the work that previously would have required application code, but they still don’t do everything.

Value streams are an enhanced version of topic streams that receive typed values instead of Content objects that might contain a delta to a value. A stream of values is easier to work with. When you add a value stream you specify the type of values you want to receive. More types are coming, but for now you will probably work with Binary or JSON. Binary values can be anything; they are an ordered sequence of bytes that your application is responsible for interpreting. JSON values can be any valid JSON; no schema is associated with a JSON topic by Diffusion.

The values you receive can still be awkward to work with in the Java API. Binary values wrap a byte array, but you need to deserialise the byte array to something that your program can understand. JSON values do not provide any methods to inspect the value. If you receive a JSON value containing an object with three properties, you cannot get the value for any of the keys. You need to convert the value into something else, like a Java Map, to lookup a key.

If the following looks like a lot of boilerplate, we’ve made a library to take care of it, Diffusion Transform available on GitHub.

How can you convert a JSON value to an object?

You receive a JSON value from Diffusion and you want to bind it to an object. JSON is supported in Java through many libraries, but the one I’m going to use is Jackson. Why Jackson? Because the JSON value wraps a CBOR-encoded byte array and Jackson has support for CBOR.

https://gist.github.com/push-gists/23de3557bbdac83509e717e4829be04c

Five lines is a lot of code to turn JSON into a map of objects, but much of this can be reused. Being able to parse CBOR is an advantage because when a JSON value is turned into a string Diffusion parses the CBOR, similar to the above, and then stringifies it. Parsing the string to bind it to an object is wasted effort.

That can be refactored until you have a method you can call with a JSON value to get a map. Many Jackson objects can be reused and their use (but not configuration) is thread safe so we can reduce the number of objects we need to construct for each conversion.

https://gist.github.com/push-gists/a01f11981fd60fd42fd18397b31b8b21

We can add more methods to this whenever we want to use Jackson to convert a JSON value in a different way.

How can I create a stream that binds JSON values for me?

We could create a stream of JSON values and immediately call our code to bind it to an object. That would be repetitive and would pollute every stream with value binding code. Really we want to create a stream of bound object values.

The adapter pattern feels like a good fit. We can create a value stream that binds the JSON values and then delegates the handling to a value stream of object values.

There is a problem though. You may have noticed that the parsing and binding code might throw an IOException. The delegate will need to be notified of the problem by extending the ValueStream to receive exceptions.

https://gist.github.com/push-gists/2a4f855ad1266ef6a4f72e4aa8bfa181

Converting a JSON value will throw an exception if the input stream does not return valid CBOR or if the value cannot be represented as a map. Value streams filter out different data types so a JSON value stream will only be notified about JSON topics. Binary topics that match the selector for the stream are ignored. JSON topics with values that cannot be represented as a map call the error handler. Now we have an adaptee we need to implement our adapter. To keep the blog from filling up with too much code we will focus on the onValue notifications, but the other notifications need to be delegated as well.

https://gist.github.com/push-gists/725a3cd0df0fa11a2735dcca0bbcb143

Right, we now have a JSON value stream that can delegate to a stream of maps. Let’s add the stream and see how it looks.

https://gist.github.com/push-gists/858e25961721dc5d34e3ab256d5c9762

How can I manipulate these values further?

So far we have been working with JSON values that can be represented as a map. We can now add a stream that takes a JSON value, transforms it into a map and handles any errors. Jackson is not limited to maps it also supports using reflection to bind JSON values into POJOs or creating a traversable tree representing the JSON value. We want to be able to work with these different types of values.

If we inject an object similar to a Function (but throwing an exception) that can take the place of the jsonToMap method then we can generalise these classes and handle any type of conversion between values. This lets us build up a collection of reusable Transformers that can be chained together.

https://gist.github.com/push-gists/a36ac6517c656635c789fde123a2ba3d

This is slightly messy code with a little too much boilerplate that we can eliminate using a fluent builder.

https://gist.github.com/push-gists/d054ec077c1a9e71acb4378e0b22db04

It is also possible to go the other way. You can define Transformers that create JSON values from other types of values. You can then use these Transformers to publish values to topics.

https://gist.github.com/push-gists/91db08c027ef04bdd5c03f4b19540f03

Why not make this a core part of the API?

In Java this is very opinionated code. It uses a third party library to provide a certain flavour of data binding. Imposing a choice on which third party library for JSON processing to use is something not appropriate for our core API.

To save you from having to copy snippets from the blog these ideas have been pulled together into a library called Diffusion Transform available on GitHub.

You should take a look at these examples to see some ways transforming value streams can be used.


Further reading

The Diffusion Data logo

BLOG

100 million updates per second - Landmark Diffusion cluster performance

July 02, 2024

Read More about 100 million updates per second - Landmark Diffusion cluster performance/span>

The Diffusion Data logo

BLOG

React PubSub using Diffusion Websocket Server

July 08, 2024

Read More about React PubSub using Diffusion Websocket Server/span>

The Diffusion Data logo

BLOG

Benchmarking and scaling subscribers

March 15, 2024

Read More about Benchmarking and scaling subscribers/span>