Conditional topic updates
September 7, 2023 | Adam Turnbull
Introduction
In Diffusion 6.10, we have released an extension to the UpdateConstraint feature that allows you to update JSON topics only if part of the content matches a particular condition. Previously, the update would only take effect if the value matched exactly; now we can use comparison operations such as less-than and greater-than.
This provides a mechanism to prevent out-of-order updates to topics, and to ensure that you do not overwrite new data with old data. This might be a problem when:
-
The input stream contains unordered data, and you want to make sure that you always update a topic with a later value. Any messages that are older than the current value are discarded.
-
You have more than one publisher sending updates to a topic and you can’t guarantee the order in which they update the topic. You would like to avoid race conditions where old values overwrite newer ones.
Example use cases
Dropping out-of-order messages
We have a publisher sending data from a 3rd party system to Diffusion, but we are unable to guarantee the order of updates.
We might see something like this:
The client will receive updates in exactly this order, and the server – and therefore the client – will receive before and for a short time will not have the true value from the source system.
Diffusion can’t reorder the messages, but with conditional updates we could ensure that we do not update a topic with an older copy of the source data. This way, the client always has the most recent value:
Multiple overlapping producers
In another situation, you might have multiple independent producers that are updating the same topic. Again, you don’t want to overwrite a topic with an older value than it currently contains. Consider two producers, sending the same data at slightly different times:
Helpfully, Diffusion (as long as you are not using the DONT_RETAIN_VALUE
flag), doesn’t send an update when there is no change to a topic, so the client doesn’t get all 8 possible updates. But, for some period of time, :green-4: is overwritten by :red-3: and old data is given to the client. Clearly, we don’t want this to happen!
Example usage
Let’s take a sample series of messages with which we want to update a topic:
{ "seqnum": 1, "price" : 1.50 } { "seqnum": 2, "price" : 1.52 } { "seqnum": 4, "price" : 1.54 } { "seqnum": 3, "price" : 1.53 } { "seqnum": 5, "price" : 1.55 }
Like our first use case, we have data here that is not in order. We will use UpdateConstraints
to make sure that the message with seqnum = 3
does not overwrite the topic after seqnum = 4
. The rules that govern updating the topic are:
-
If the topic does not exist, we can add it.
-
If the topic exists, but has no data, we can update it.
-
If the topic exists and has data, the
seqnum
field must contain a value that is less than the one in the message we are using for the update.
To do this, we create an UpdateConstraint
with the following rules:
final UpdateConstraint constraint =
Diffusion.updateConstraints()
.noTopic()
.or(Diffusion.updateConstraints().noValue())
.or(Diffusion.updateConstraints().jsonValue()
.with("/seqnum",
UpdateConstraint.Operator.LT,
thisSeqNum));
Available condition operators
LT
is not the only comparison operator available. Here is a full list:
Operator |
Description |
IS |
Is exactly the same |
EQ |
Logically equal to |
NE |
Logically not equal to |
GT |
Greater than |
GE |
Greater than or equal to |
LT |
Less than |
LE |
Less than or equal to |
In all cases except for IS
, when the value can be interpreted as a number such as a Long or a Double it is, and this includes when it is formatted as a string. For example, LT 5
is interpreted the same as LT "5.0"
.
The difference between IS
and EQ
is that IS
performs an exact comparison at the byte level, whereas EQ
will check to see if the current and new values are equivalent:
Value |
1.2 |
1.20 |
“1.20” |
---|---|---|---|
1.2 |
IS + EQ |
EQ |
EQ |
1.20 |
EQ |
IS + EQ |
EQ |
“1.20” |
EQ |
EQ |
IS + EQ |
Full example
The following class shows how to send JSON messages as per the above example.
package com.diffusiondata.examples; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.TopicUpdate; import com.pushtechnology.diffusion.client.features.UpdateConstraint; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.topics.details.TopicType; import com.pushtechnology.diffusion.datatype.json.JSON; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; public class Main { public static void main(String[] args) throws Exception { final Session session = Diffusion.sessions() .principal("admin") .password("password") .open("ws://localhost:8090"); final TopicSpecification topicSpec = Diffusion.newTopicSpecification(TopicType.JSON); // A sequence of JSON-formatted messages to send to Diffusion. // Note the out-of-order seqnum 3. final String[] messages = { "{\"seqnum\": 1, \"price\": 1.50}", "{\"seqnum\": 2, \"price\": 1.52}", "{\"seqnum\": 4, \"price\": 1.54}", "{\"seqnum\": 3, \"price\": 1.53}", "{\"seqnum\": 5, \"price\": 1.55}", }; final ObjectMapper mapper = new ObjectMapper(); for (String thisMessage : messages) { final JsonNode node = mapper.readTree(thisMessage); final int thisSeqNum = node.get("seqnum").asInt(); final UpdateConstraint constraint = Diffusion.updateConstraints() .noTopic() .or(Diffusion.updateConstraints().noValue()) .or(Diffusion.updateConstraints().jsonValue().with("/seqnum", UpdateConstraint.Operator.LT, thisSeqNum)); final JSON myJsonMessage = Diffusion.dataTypes().json().fromJsonString(thisMessage); session.feature(TopicUpdate.class) .addAndSet("my-topic", topicSpec, JSON.class, myJsonMessage, constraint) .whenComplete((result, err) -> { if (err == null) { System.out.println("Updated topic with seqnum=" + thisSeqNum); } else { System.err.println("Failed to update topic with seqnum=" + thisSeqNum + ": " + err.getMessage()); } }); } // Give some time for futures to complete Thread.sleep(1000); // Done. session.close(); }
Further reading
BLOG
Unlocking the Value of ISO 27001 Certification: A Journey of Security and Continuous Improvement
March 25, 2024
BLOG
React PubSub using Diffusion Websocket Server
July 08, 2024
Read More about React PubSub using Diffusion Websocket Server/span>
BLOG
Benchmarking and scaling subscribers
March 15, 2024