Diffusion for Developers
Create real-time applications that deliver hyper-personalized data to millions of concurrent consumers.
Remove the limits on team productivity – rapidly develop and connect applications with your choice of language, protocols and APIs.
const session = await diffusion.connect({
host : "<HOST>",
port : "<PORT>",
principal : "<PRINCIPAL>",
credentials : "<PASSWORD>"
});
await session.topics.add('<TOPIC_PATH>', diffusion.topics.TopicType.JSON);
const value = { foo : "bar" };
await session.topicUpdate.set("<TOPIC_PATH>", diffusion.datatypes.json(), value);
const session = await diffusion.connect({
host : "<HOST>",
port : "<PORT>",
principal : "<PRINCIPAL>",
credentials : "<PASSWORD>"
});
session.addStream('<TOPIC_PATH>', diffusion.datatypes.json())
.on('value', function(topic, specification, newValue, oldValue) {
console.log('New value:', newValue.get());
});
await session.select('<TOPIC_PATH>');
var session = Diffusion.Sessions
.Principal("<PRINCIPAL>")
.Password("<PASSWORD>")
.Open("ws://<HOST>:<PORT>:");
await session.TopicControl.AddTopicAsync("<TOPIC_PATH>", TopicType.JSON);
var jsonDataType = Diffusion.DataTypes.JSON;
var value = jsonDataType.FromJSONString("{\"foo\": \"bar\"}");
_ = session.TopicUpdate.SetAsync("<TOPIC_PATH>", value);
var session = Diffusion.Sessions
.Principal("<PRINCIPAL>")
.Password("<PASSWORD>")
.Open("ws://<HOST>:<PORT>:");
session.Topics.AddStream("<TOPIC_PATH>", new ExampleValueStream());
_ = await session.Topics.SubscribeAsync("<TOPIC_PATH>");
class ExampleValueStream : IValueStream<IJSON>
{
public void OnSubscription(string topicPath, ITopicSpecification specification)
{
Console.WriteLine($"Subscribed to: {topicPath}");
}
public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason)
{
Console.WriteLine($"Unsubscribed from: {topicPath}");
}
public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue)
{
Console.WriteLine($"{topicPath}: {newValue.ToJSONString()}");
}
public void OnClose()
{
// Not used
}
public void OnError(ErrorReason errorReason)
{
// Not used
}
}
final Session session = Diffusion
.sessions()
.principal("<PRINCIPAL>")
.password("<PASSWORD>")
.open("ws://<HOST>:<PORT>");
session.feature(TopicControl.class).addTopic("<TOPIC_PATH>", TopicType.JSON);
session.feature(TopicUpdate.class).set("<TOPIC_PATH>", JSON.class,
Diffusion.dataTypes().json().fromJsonString("{\"hello\": \"world\"}"));
final Session session = Diffusion
.sessions()
.principal("<PRINCIPAL>")
.password("<PASSWORD>")
.open("ws://<HOST>:<PORT>");
session.feature(Topics.class)
.addStream("<TOPIC_PATH>", JSON.class, new Topics.ValueStream.Default<JSON>() {
@Override
public void onValue(
String topicPath,
TopicSpecification specification,
JSON oldValue,
JSON newValue) {
System.out.println("New value for " + topicPath + ": " + newValue.toJsonString());
}
@Override
public void onSubscription(
String topicPath,
TopicSpecification specification) {
System.out.println("on subscription: " + topicPath);
}
@Override
public void onUnsubscription(
String topicPath,
TopicSpecification specification,
UnsubscribeReason reason) {
System.out.println("on unsubscription: " + topicPath + ": " + reason.toString());
}
});
session.feature(Topics.class).subscribe("<TOPIC_PATH>");
let url = URL(string: "ws://<HOST>:<PORT>")!
let credentials = PTDiffusionCredentials(password: "<PASSWORD>")
let config = PTDiffusionSessionConfiguration(principal: "<PRINCIPAL>",
credentials: credentials)
let errorHandler: (Any?, Error?) -> Void = {response, error in
if (error != nil) {
print(error!)
}
}
PTDiffusionSession.open(with: url,
configuration: config) { (session, error) in
if (error != nil) {
print("An error has occurred: \(error!.localizedDescription)")
return
}
session!.topicControl.addTopic(withPath: "<PATH>",
type: PTDiffusionTopicType.JSON,
completionHandler: errorHandler)
let value = try! PTDiffusionJSON(object: ["foo": "bar"])
session!.topicUpdate.setWithPath("<PATH>",
toJSONValue: value) { error in errorHandler(nil, error) }
}
class StreamDelegate: NSObject, PTDiffusionJSONValueStreamDelegate {
func diffusionStream(_ stream: PTDiffusionStream,
didUnsubscribeFromTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification,
reason: PTDiffusionTopicUnsubscriptionReason) {
print("Unsubscribed from: \(topicPath)")
}
func diffusionStream(_ stream: PTDiffusionStream,
didFailWithError error: Error) {
print("Failed with error: \(error)")
}
func diffusionDidClose(_ stream: PTDiffusionStream) {
print("Closed")
}
func diffusionStream(_ stream: PTDiffusionStream,
didSubscribeToTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification) {
print("Subscribed to: \(topicPath)")
}
func diffusionStream(_ stream: PTDiffusionValueStream,
didUpdateTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification,
oldJSON: PTDiffusionJSON?,
newJSON: PTDiffusionJSON) {
do {
let value:Dictionary<String, Any> = try newJSON.object() as! Dictionary
print("\(topicPath): \(value.description)")
}
catch {
print("Unable to read message")
}
}
}
let url = URL(string: "ws://<HOST>:<PORT>")!
let credentials = PTDiffusionCredentials(password: "<PASSWORD>")
let config = PTDiffusionSessionConfiguration(principal: "<PRINCIPAL>",
credentials: credentials)
let errorHandler: (Any?, Error?) -> Void = {response, error in
if (error != nil) {
print(error!)
}
}
PTDiffusionSession.open(with: url,
configuration: config) { (session, error) -> Void in
if (error != nil) {
print("An error has occurred: \(error!.localizedDescription)")
return
}
let delegate = StreamDelegate()
let selector = PTDiffusionTopicSelector(expression: "<PATH>")
let stream = PTDiffusionJSON.valueStream(with: delegate)
try! session!.topics.add(stream, with: selector, error: ())
session!.topics.subscribe(withTopicSelectorExpression: "<PATH>") { error in errorHandler(nil, error) }
}
// Define the callback functions
static int on_topic_creation_result(
DIFFUSION_TOPIC_CREATION_RESULT_T result,
void *context)
{
// topic has been added
return HANDLER_SUCCESS;
}
// Create a session factory and set principal and password
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "<PRINCIPAL>");
diffusion_session_factory_password(session_factory, "<PASSWORD>");
// Create a new session
SESSION_T *session = session_create_with_session_factory(session_factory, "ws://<HOST>:<PORT>");
BUF_T *buf = buf_create();
write_diffusion_json_value("\"hello world\"", buf);
TOPIC_SPECIFICATION_T *spec = topic_specification_init(TOPIC_TYPE_JSON);
DIFFUSION_TOPIC_UPDATE_ADD_AND_SET_PARAMS_T topic_add_and_set_params = {
.topic_path = "<PATH>",
.specification = spec,
.datatype = DATATYPE_JSON,
.update = buf,
.on_topic_update_add_and_set = on_topic_creation_result
};
// Add a topic and set its value
diffusion_topic_update_add_and_set(session, topic_add_and_set_params);
// Define the callback functions
static int on_subscription(
const char* topic_path,
const TOPIC_SPECIFICATION_T *specification,
void *context)
{
// value stream is now subscribed to `topic_path`
return HANDLER_SUCCESS;
}
static int on_unsubscription(
const char* topic_path,
const TOPIC_SPECIFICATION_T *specification,
NOTIFY_UNSUBSCRIPTION_REASON_T reason,
void *context)
{
// value stream is now unsubscribed from `topic_path`
return HANDLER_SUCCESS;
}
static int on_value(
const char* topic_path,
const TOPIC_SPECIFICATION_T *const specification,
const DIFFUSION_DATATYPE datatype,
const DIFFUSION_VALUE_T *const old_value,
const DIFFUSION_VALUE_T *const new_value,
void *context)
{
// read the value update
return HANDLER_SUCCESS;
}
static void on_close()
{
// value stream has been closed
}
// Create a session factory and set principal and password
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "<PRINCIPAL>");
diffusion_session_factory_password(session_factory, "<PASSWORD>");
// Create a new session
SESSION_T *session = session_create_with_session_factory(session_factory, "ws://<HOST>:<PORT>");
VALUE_STREAM_T value_stream = {
.datatype = DATATYPE_JSON,
.on_subscription = on_subscription,
.on_unsubscription = on_unsubscription,
.on_value = on_value,
.on_close = on_close
};
add_stream(session, "<PATH>", &value_stream);
SUBSCRIPTION_PARAMS_T params = {
.topic_selector = "<PATH>"
};
subscribe(session, params);
# Add a topic and set its value.
topic_type = diffusion.datatypes.STRING
value = "Value1"
async def main(
topic_selector="<TOPIC_PATH>",
host="<HOST>",
port="<PORT>",
principal="<PRINCIPAL>",
password="<PASSWORD>",
protocol="wss"
):
async with diffusion.Session(
url=f"{protocol}://{host}:{port}",
principal=f"{principal}",
credentials=diffusion.Credentials(f"{password}"),
) as session:
add_response = await session.topics.add_topic(topic_selector, topic_type)
if add_response == session.topics.CREATED:
print(f"Topic {topic_selector} successfully created.")
if add_response == session.topics.EXISTS:
print(f"Topic {topic_selector} already exists.")
await session.topics.set_topic(topic_selector, value, specification=topic_type)
print(f"Topic {topic_selector} successfully set to {value}")
# Subscribe to topics.
def on_update(*, old_value, topic_path, topic_value, **kwargs):
print("Topic:", topic_path)
if old_value is None:
print(" Initial value:", topic_value)
else:
print(" Value updated")
print(" Old value:", old_value)
print(" New value:", topic_value)
def on_subscribe(*, topic_path, **kwargs):
print(f"Subscribed to {topic_path}")
def on_unsubscribe(*, reason, topic_path, **kwargs):
print(f"Unsubscribed from {topic_path} because {str(reason)}")
topic_type = diffusion.datatypes.STRING
session_duration = 15
value_stream = diffusion.features.topics.ValueStreamHandler(
data_type=topic_type,
update=on_update,
subscribe=on_subscribe,
unsubscribe=on_unsubscribe,
)
async def main(
topic_selector="<TOPIC_PATH>",
host="<HOST>",
port="<PORT>",
principal="<PRINCIPAL>",
password="<PASSWORD>",
protocol="wss"
):
async with diffusion.Session(
url=f"{protocol}://{host}:{port}",
principal=f"{principal}",
credentials=diffusion.Credentials(f"{password}"),
) as session:
print("Adding value stream")
session.topics.add_value_stream(
topic_selector=topic_selector, stream=value_stream
)
print(f"Subscribing to {topic_selector}")
await session.topics.subscribe(topic_selector)
await asyncio.sleep(session_duration)
print(f"Unsubscribing from {topic_selector}")
await session.topics.unsubscribe(topic_selector)
Consume, Transform and Deliver Data with Intelligence and Ease
CONSUME
Quickly integrate any data source using pre-built adapters to simplify connecting data streams, or build your own with our fully-featured SDKs.
Adapters
- Poll REST data sources and expose as streaming data through the Diffusion topic tree.
- CDC (Change Data Capture) to stream updates from database tables, in real time.
- Stream data from Kafka topics, presenting your data at internet scale.
- Or write your own using our Java Gateway Framework.
Protocols
- MQTT Support enables direct connection of IoT devices using MQTT 5.0.
- REST API is ideal for one-off updates or data snapshots, suitable for low-power devices and can be called with any language capable of HTTP requests.
SDKs
- Use the full power of the Diffusion API to integrate with your services in a wide range of programming languages.
TRANSFORM
Diffusion tranforms data in-flight, processes and segments data in real time, stores event streams for querying and editing and streams real-time data from your clients’ apps.
Hyper-Personalize Data
- Easily transform and map incoming data with low-code topic views—or implement advanced application logic with control clients.
- Aggregate multiple incoming values into one topic to turn a clickstream into a customer.
- Expand a single incoming data point and generate subtopics to make a list of prices into markets.
- Tailor data for delivery to each service or region and even provide a custom feed for each end user.
TIME SERIES
- Store a time-stamped event within a single topic.
- Stream events as they happen or query to retrieve part of a series.
- Enable non-destructive updates to update a posted event while maintaining a full audit trail
Delayed Feed
- Topic views can be used to create a delayed feed and are ideal for creating lower-value versions of time-sensitive data. You can replay every data change with custom delays of as little as one second or multiple days.
DELIVER
Diffusion features patented delta streaming technology that minimizes costs, controls data access down to individual end users to maximize security, and uses remote topic views to efficiently distribute data across a geographically dispersed user base.
Delta Streaming
- Intelligently distinguishes between old, updated and new data, only sending recent, relevant, information to clients instead of the entire topic content. 90% reduction in server and bandwidth requirements is achieved by avoiding the need to send data that isn’t changing from one markup to the next.
Fine-Grained Security
- Fine-grained dynamic security permissions can be provided for each topic.
- Unique security permissions can be assigned to each user, scalable to hundreds of thousands of users.
- Access changes are applied immediately giving you real-time control of what data each user can see.
Remote Topic Views
- Copy all or part of the topic tree on one server to another.
- Ideal for distributing data across a geographically dispersed user base.
- Minimize latency and bandwidth by serving data from a local server.
- Easy set up and modification from the Diffusion monitoring console or via SDKs.
Diffusion SDKs
Available for immediate download. Need help or direction? Book a call with one of our architects.
Learn About the Diffusion Intelligent Data Platform
Whether you’re using Diffusion now or just getting your feet wet, our learning resources are available to help anytime.
Support
Need a hand or a few pointers? Contact our support team.