Building on Value Streams
September 16, 2016 | Matthew Champion
In DiffusionTM 5.7 we introduced data types, value streams, Binary
topics and JSON
topics. Together they streamline much of the work that previously would have required application code, but they still don’t do everything.
Value streams are an enhanced version of topic streams that receive typed values instead of Content
objects that might contain a delta to a value. A stream of values is easier to work with. When you add a value stream you specify the type of values you want to receive. More types are coming, but for now you will probably work with Binary
or JSON
. Binary
values can be anything; they are an ordered sequence of bytes that your application is responsible for interpreting. JSON
values can be any valid JSON; no schema is associated with a JSON
topic by Diffusion.
The values you receive can still be awkward to work with in the Java API. Binary
values wrap a byte array, but you need to deserialise the byte array to something that your program can understand. JSON
values do not provide any methods to inspect the value. If you receive a JSON
value containing an object with three properties, you cannot get the value for any of the keys. You need to convert the value into something else, like a Java Map
, to lookup a key.
If the following looks like a lot of boilerplate, we’ve made a library to take care of it, Diffusion Transform available on GitHub.
How can you convert a JSON value to an object?
You receive a JSON
value from Diffusion and you want to bind it to an object. JSON is supported in Java through many libraries, but the one I’m going to use is Jackson. Why Jackson? Because the JSON
value wraps a CBOR-encoded byte array and Jackson has support for CBOR.
https://gist.github.com/push-gists/23de3557bbdac83509e717e4829be04c
Five lines is a lot of code to turn JSON into a map of objects, but much of this can be reused. Being able to parse CBOR is an advantage because when a JSON
value is turned into a string Diffusion parses the CBOR, similar to the above, and then stringifies it. Parsing the string to bind it to an object is wasted effort.
That can be refactored until you have a method you can call with a JSON
value to get a map. Many Jackson objects can be reused and their use (but not configuration) is thread safe so we can reduce the number of objects we need to construct for each conversion.
https://gist.github.com/push-gists/a01f11981fd60fd42fd18397b31b8b21
We can add more methods to this whenever we want to use Jackson to convert a JSON
value in a different way.
How can I create a stream that binds JSON values for me?
We could create a stream of JSON
values and immediately call our code to bind it to an object. That would be repetitive and would pollute every stream with value binding code. Really we want to create a stream of bound object values.
The adapter pattern feels like a good fit. We can create a value stream that binds the JSON
values and then delegates the handling to a value stream of object values.
There is a problem though. You may have noticed that the parsing and binding code might throw an IOException
. The delegate will need to be notified of the problem by extending the ValueStream
to receive exceptions.
https://gist.github.com/push-gists/2a4f855ad1266ef6a4f72e4aa8bfa181
Converting a JSON
value will throw an exception if the input stream does not return valid CBOR or if the value cannot be represented as a map. Value streams filter out different data types so a JSON
value stream will only be notified about JSON
topics. Binary topics that match the selector for the stream are ignored. JSON
topics with values that cannot be represented as a map call the error handler. Now we have an adaptee we need to implement our adapter. To keep the blog from filling up with too much code we will focus on the onValue
notifications, but the other notifications need to be delegated as well.
https://gist.github.com/push-gists/725a3cd0df0fa11a2735dcca0bbcb143
Right, we now have a JSON
value stream that can delegate to a stream of maps. Let’s add the stream and see how it looks.
https://gist.github.com/push-gists/858e25961721dc5d34e3ab256d5c9762
How can I manipulate these values further?
So far we have been working with JSON
values that can be represented as a map. We can now add a stream that takes a JSON
value, transforms it into a map and handles any errors. Jackson is not limited to maps it also supports using reflection to bind JSON
values into POJOs or creating a traversable tree representing the JSON
value. We want to be able to work with these different types of values.
If we inject an object similar to a Function (but throwing an exception) that can take the place of the jsonToMap
method then we can generalise these classes and handle any type of conversion between values. This lets us build up a collection of reusable Transformer
s that can be chained together.
https://gist.github.com/push-gists/a36ac6517c656635c789fde123a2ba3d
This is slightly messy code with a little too much boilerplate that we can eliminate using a fluent builder.
https://gist.github.com/push-gists/d054ec077c1a9e71acb4378e0b22db04
It is also possible to go the other way. You can define Transformer
s that create JSON
values from other types of values. You can then use these Transformer
s to publish values to topics.
https://gist.github.com/push-gists/91db08c027ef04bdd5c03f4b19540f03
Why not make this a core part of the API?
In Java this is very opinionated code. It uses a third party library to provide a certain flavour of data binding. Imposing a choice on which third party library for JSON processing to use is something not appropriate for our core API.
To save you from having to copy snippets from the blog these ideas have been pulled together into a library called Diffusion Transform available on GitHub.
You should take a look at these examples to see some ways transforming value streams can be used.
Further reading
BLOG
100 million updates per second - Landmark Diffusion cluster performance
July 02, 2024
Read More about 100 million updates per second - Landmark Diffusion cluster performance/span>
BLOG
React PubSub using Diffusion Websocket Server
July 08, 2024
Read More about React PubSub using Diffusion Websocket Server/span>
BLOG
Benchmarking and scaling subscribers
March 15, 2024