Expanded Feature Support In The Diffusion C Client

Diffusion 6.1 and 6.2 added many functions and capabilities to the Diffusion C client that were previously only available in other languages:

  • Adding topics through topic specifications (added in 6.1)
  • Support for string, int64, double and recordV2 topic types (added in 6.1)
  • Value stream API support (added in 6.1)
  • Enhanced topic update (added in 6.2)
  • Request/response messaging (added in 6.2)
  • Authenticator API support (added in 6.2)

Adding topics through topic specifications

You can now add topics and set their properties using topic specifications.

void add_topic_from_specification(SESSION_T *session, const char *topic_path, const TOPIC_SPECIFICATION_T *specification, const ADD_TOPIC_CALLBACK_T callback);

For example, to create a JSON topic specification with no properties:

TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON);

To create a JSON topic specification with topic property DIFFUSION_VALIDATE_VALUES set to true:

HASH_T *properties = hash_new(1);

hash_add(properties, DIFFUSION_VALIDATE_VALUES, "true");

TOPIC_SPECIFICATION_T *specification = topic_specification_with_properties(TOPIC_TYPE_JSON, properties);

ADD_TOPIC_CALLBACK_T consists of function pointers to callback functions (much like ADD_TOPIC_HANDLERS for ADD_TOPIC_PARAMS_T)

typedef int (*on_topic_add_cb)(SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context);

typedef int (*on_topic_add_failed_cb)(SESSION_T *session, TOPIC_ADD_FAIL_RESULT_CODE result_code, const DIFFUSION_ERROR_T *error, void *context);

Examples showing how to use this can can be found in the add-topics.c example code that is installed with Diffusion.

Support for string, int64, double and recordV2 topic types

The Diffusion C client now allows applications to add, update and subscribe to topics of these types:

  • string
  • int64
  • double
  • recordV2

The only way to add topics of these types is through topic specifications (detailed above).

Subscribing to topics of these types remains the same, though we now encourage using value streams as opposed to topic message handlers.

Producing update values for these topic types has now been made simpler. Each of these topic data types now has its own write function:

bool write_diffusion_string_value(const char *value, const BUF_T *buf);

bool write_diffusion_int64_value(int64_t value, const BUF_T *buf);

bool write_diffusion_double_value(double value, const BUF_T *buf);

bool write_diffusion_recordv2_value(const void *recordv2, const BUF_T *buf);

The bool returned will be true if the value was successfully written into the BUF_T. Otherwise, it will be false.

The BUF_T can then be used as part of the newly introduced DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T struct:

BUF_T *buf = buf_create();

if(!write_diffusion_string_value("hello world", buf)) {
        // handle error
        return EXIT_FAILURE;
}

DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T topic_update_params = {
    .topic_path = topic_path,
    .datatype = DATATYPE_STRING,
    .update = buf,
    .on_topic_update = on_topic_update
};

diffusion_topic_update_set(session, topic_update_params);

Value stream API

The value stream API provides greater flexibility when dealing with topic subscriptions and the subsequent values that they provide.

The callback functions are (located in value-stream.h):

typedef int (*value_stream_on_subscription)(const char *const topic_path, const TOPIC_SPECIFICATION_T *specification, void *context);

typedef int (*value_stream_on_unsubscription)(const char *const topic_path, const TOPIC_SPECIFICATION_T *const specification,
                                              NOTIFY_UNSUBSCRIPTION_REASON_T reason, void *context);

typedef int (*value_stream_on_value)(const char *const topic_path, const TOPIC_SPECIFICATION_T *const specification,
                                     DIFFUSION_DATATYPE datatype, const DIFFUSION_VALUE_T *const old_value, const DIFFUSION_VALUE_T *const new_value,
                                     void *context);

typedef void (*value_stream_on_close)(void);

typedef void (*value_stream_on_error)(const DIFFUSION_ERROR_T *error);

Functions using value streams are (located in streams.h):

VALUE_STREAM_HANDLE_T *add_stream(SESSION_T *session, const char *topic_selector, const VALUE_STREAM_T *value_stream);

VALUE_STREAM_HANDLE_T *add_fallback_stream(SESSION_T *session, const VALUE_STREAM_T *value_stream);

void remove_stream(SESSION_T *session, const VALUE_STREAM_HANDLE_T *handle);

Value streams are now the preferred way to receive topic subscription/unsubscription notifications and streaming topic values.

Value streams are not compatible with legacy topics (such as stateless, single value, or record topics).

As value streams provide subscription and value notification, the SUBSCRIPTION_PARAMS_T.on_topic_message sub-callback is now obsolete. If a value stream matching the topic to be subscribed to is added, then the on_topic_message sub-callback pointer value can simply be set to NULL:

VALUE_STREAM_T value_stream = {
        .datatype = DATATYPE_STRING,
        .on_subscription = on_subscription,
        .on_unsubscription = on_unsubscription,
        .on_value = on_value,
        .on_error = on_error
};

add_stream(session, "foo", &value_stream);

SUBSCRIPTION_PARAMS_T params = {
        .topic_selector = "foo",
        .on_topic_message = NULL
};

subscribe(session, params);

The values passed into the callback functions should not be freed, as their memory is internally managed by the API.

The value stream API supports delta streaming, providing an improvement over how topic updates were previously observed. Each update gives applications visibility over the current and previous value at a particular topic.

Enhanced topic update

The C client has an enhanced update API, referenced in topic-update.h:

Key enhancements are:

  • Updates can be performed with constraints against them – where the topic update will only be enacted if the constraint is passed. Update constraints are located in update-constraint.h
  • Topic updates can be performed against topics that don’t yet exist, with the add_and_set functions. This function will add the topic (if it doesn’t exist) and perform the update. If the topic already exists, it will be updated.
typedef struct diffusion_topic_update_params_s {
        /// Topic path to be updated
        const char *topic_path;
        /// The topic update datatype
        DIFFUSION_DATATYPE datatype;
        /// Update value. Can be `NULL` for the following
        /// datatypes: `DATATYPE_STRING`,`DATATYPE_INT64`
        /// or `DATATYPE_DOUBLE`
        BUF_T *update;
        /// Callback when the topic update is successful
        on_topic_update_set_success_cb on_topic_update;
        /// Callback to handle errors. Can be NULL.
        ERROR_HANDLER_T on_error;
        /// Callback to handle. Can be NULL.
        DISCARD_HANDLER_T on_discard;
        /// User-supplied context returned to callbacks.
        void *context;
} DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T;

typedef struct diffusion_topic_update_add_and_set_params_s {
        /// Topic path to be added
        const char *topic_path;
        /// The topic's specification
        TOPIC_SPECIFICATION_T *specification;
        /// The topic update datatype
        DIFFUSION_DATATYPE datatype;
        /// Update value. Can be `NULL` for the following
        /// datatypes: `DATATYPE_STRING`,`DATATYPE_INT64`
        /// or `DATATYPE_DOUBLE`
        BUF_T *update;
        /// Callback when the topic add and set is successful
        on_topic_update_topic_creation_cb on_topic_update_add_and_set;
        /// Callback to handle errors. Can be NULL.
        ERROR_HANDLER_T on_error;
        /// Callback to handle. Can be NULL.
        DISCARD_HANDLER_T on_discard;
        /// User-supplied context returned to callbacks.
        void *context;
} DIFFUSION_TOPIC_UPDATE_ADD_AND_SET_PARAMS_T;
void diffusion_topic_update_set(SESSION_T *session, DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T params);

void diffusion_topic_update_set_with_constraint(SESSION_T *session, const DIFFUSION_TOPIC_UPDATE_CONSTRAINT_T *constraint, DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T params);

void diffusion_topic_update_add_and_set(SESSION_T *session, DIFFUSION_TOPIC_UPDATE_ADD_AND_SET_PARAMS_T params);

void diffusion_topic_update_add_and_set_with_constraint(SESSION_T *session, const DIFFUSION_TOPIC_UPDATE_CONSTRAINT_T *constraint, DIFFUSION_TOPIC_UPDATE_ADD_AND_SET_PARAMS_T params);

DIFFUSION_TOPIC_UPDATE_STREAM_T *diffusion_topic_update_create_update_stream(SESSION_T *session, const char *topic_path, DIFFUSION_DATATYPE datatype);

DIFFUSION_TOPIC_UPDATE_STREAM_T *diffusion_topic_update_create_update_stream_with_constraint(SESSION_T *session, const DIFFUSION_TOPIC_UPDATE_CONSTRAINT_T *constraint, const char *topic_path, DIFFUSION_DATATYPE datatype);

DIFFUSION_TOPIC_UPDATE_STREAM_T *diffusion_topic_update_create_update_stream_adding_topic(SESSION_T *session, const char *topic_path, const TOPIC_SPECIFICATION_T *specification,
                                                                                          DIFFUSION_DATATYPE datatype, DIFFUSION_TOPIC_UPDATE_STREAM_PARAMS_T params);

DIFFUSION_TOPIC_UPDATE_STREAM_T *diffusion_topic_update_create_update_stream_adding_topic_with_constraint(SESSION_T *session, const DIFFUSION_TOPIC_UPDATE_CONSTRAINT_T *constraint, const char *topic_path,
                                                                                                          const TOPIC_SPECIFICATION_T *specification, DIFFUSION_DATATYPE datatype, DIFFUSION_TOPIC_UPDATE_STREAM_PARAMS_T params);
typedef int (*on_topic_update_set_success_cb)(void *context);

typedef int (*on_topic_update_topic_creation_cb)(DIFFUSION_TOPIC_CREATION_RESULT_T result, void *context);

These update functions are the preferred alternative to updating using the topic-update-control.h feature. Topic update streams also have their own API functions located in update-stream.h which provide even more flexibility around performing topic updates.

Request/response messaging

Request/response messaging is provided as an enhancement to the C client’s classic one-way messaging. These functions can be found in messaging.h and messaging-control.h.

typedef struct send_request_params_s {
        /// Path to send the request to
        const char *path;
        /// Request to be sent
        BUF_T *request;
        /// The datatype of the request
        DIFFUSION_DATATYPE request_datatype;
        /// The datatype of the response to be received
        DIFFUSION_DATATYPE response_datatype;
        /// Callback to handle the response
        request_on_response_cb on_response;
        /// Callback to handle errors. Can be NULL.
        ERROR_HANDLER_T on_error;
        /// Callback to handle. Can be NULL.
        DISCARD_HANDLER_T on_discard;
        /// User-supplied context. Can be NULL.
        void *context;
} SEND_REQUEST_PARAMS_T;

typedef struct send_request_to_session_params_s {
        /// The session id of the session to receive the request.
        SESSION_ID_T *recipient_session;
        /// The request path used by the recipient to select an appropriate handler.
        const char *path;
        /// The request to send
        BUF_T *request;
        /// The datatype of the request
        DIFFUSION_DATATYPE request_datatype;
        /// The datatype of the response to be received
        DIFFUSION_DATATYPE response_datatype;
        /// Callback to handle the response
        request_on_response_cb on_response;
        /// Callback to handle errors. Can be NULL.
        ERROR_HANDLER_T on_error;
        /// Callback to handle. Can be NULL.
        DISCARD_HANDLER_T on_discard;
        /// User-supplied context. Can be NULL.
        void *context;
} SEND_REQUEST_TO_SESSION_PARAMS_T;

typedef struct send_request_to_filter_params_s {
        /// The request path to send the request to.
        const char *path;
        /// The session filter expression.
        const char *filter;
        /// The datatype of the request
        DIFFUSION_DATATYPE request_datatype;
        /// The datatype of the response to be received
        DIFFUSION_DATATYPE response_datatype;
        /// Request to send
        BUF_T *request;
        /// Callback when the request has been dispatched to
        /// all matching sessions.
        filtered_request_number_sent on_number_sent;
        /// Callback when a response is received
        filtered_request_on_response on_response;
        /// Callback when an error response is received
        filtered_request_on_response_error on_response_error;
        /// Callback to handle errors. Can be NULL.
        ERROR_HANDLER_T on_error;
        /// Callback to handle. Can be NULL.
        DISCARD_HANDLER_T on_discard;
        /// User supplied context. Can be NULL.
        void *context;
} SEND_REQUEST_TO_FILTER_PARAMS_T;
void send_request(SESSION_T *session, SEND_REQUEST_PARAMS_T params);

DIFFUSION_REQUEST_STREAM_T *set_request_stream(SESSION_T *session, const char *path,
                        DIFFUSION_DATATYPE request_datatype, DIFFUSION_DATATYPE response_datatype, const DIFFUSION_REQUEST_STREAM_T *request_stream);

DIFFUSION_REQUEST_STREAM_T *remove_request_stream(SESSION_T *session, const char *request_path);

void send_request_to_session(SESSION_T *session, SEND_REQUEST_TO_SESSION_PARAMS_T params);

void send_request_to_filter(SESSION_T *session, SEND_REQUEST_TO_FILTER_PARAMS_T params);

void add_request_handler(SESSION_T *session, ADD_REQUEST_HANDLER_PARAMS_T params);
typedef int (*request_stream_on_request_cb)(SESSION_T *session, const char *path, DIFFUSION_DATATYPE request_datatype,
                                            const DIFFUSION_VALUE_T *request, const DIFFUSION_RESPONDER_HANDLE_T *responder_handle, void *context);

typedef int (*request_handler_on_request)(SESSION_T *session,  DIFFUSION_DATATYPE request_datatype, const DIFFUSION_VALUE_T *request,
                                          const DIFFUSION_REQUEST_CONTEXT_T *request_context, const DIFFUSION_RESPONDER_HANDLE_T *handle, void *context);

typedef int (*request_on_response_cb)(DIFFUSION_DATATYPE response_datatype, const DIFFUSION_VALUE_T *response, void *context);

typedef int (*filtered_request_on_response)(DIFFUSION_DATATYPE response_datatype, const DIFFUSION_VALUE_T *response, void *context);

typedef int (*filtered_request_on_response_error)(const SESSION_ID_T *session_id, const DIFFUSION_ERROR_T *error);

An important advantage of request/response messaging over one-way messaging is its ability to provide application level bidirectional messaging as an API function. One-way messaging’s “fire-and-forget” model doesn’t provide this.

Examples on how to use request/response messaging are located in the Diffusion installation examples folder, within send-request-to-filter.c, send-request-to-path.c and send-request-to-session.c

Authenticator API

The C client’s prior authentication mechanism is now deprecated in favor of the authenticator API. This provides:

  • More information on each authentication request, including session properties and proposed session properties of the authentication request.
  • The ability to allow an authentication request with mandated session properties.

These functions are located in authentication-control.h and authenticator.h

typedef struct diffusion_authentication_handler_s {
        /// Name of the authentication handler
        char *handler_name;
        /// Callback when the authentication handler is
        /// active.
        authenticator_on_active on_active;
        /// Callback when a connection request to be
        /// authenticated has been received.
        authenticator_on_authenticate on_authenticate;
        /// Callback when the authentication handler
        /// encounters an error.
        authenticator_on_error on_error;
        /// Callback when an authentication handler
        /// is closed.
        authenticator_on_close on_close;
} DIFFUSION_AUTHENTICATION_HANDLER_T;
void diffusion_set_authentication_handler(SESSION_T *session, const DIFFUSION_AUTHENTICATION_HANDLER_PARAMS_T params);

bool diffusion_authenticator_allow(SESSION_T *session, const DIFFUSION_AUTHENTICATOR_T *authenticator, DIFFUSION_API_ERROR *error);

bool diffusion_authenticator_allow_with_properties(SESSION_T *session, const DIFFUSION_AUTHENTICATOR_T *authenticator, const HASH_T *properties, DIFFUSION_API_ERROR *error);

bool diffusion_authenticator_abstain(SESSION_T *session, const DIFFUSION_AUTHENTICATOR_T *authenticator, DIFFUSION_API_ERROR *error);

bool diffusion_authenticator_deny(SESSION_T *session, const DIFFUSION_AUTHENTICATOR_T *authenticator, DIFFUSION_API_ERROR *error);

DIFFUSION_AUTHENTICATOR_T *diffusion_authenticator_dup(const DIFFUSION_AUTHENTICATOR_T *authenticator);

void diffusion_authenticator_free(DIFFUSION_AUTHENTICATOR_T *authenticator);

Authentication requests are routed to user defined callbacks:

typedef int (*authenticator_on_authenticate)(SESSION_T *session, const char *principal, const CREDENTIALS_T *credentials,
                                             const HASH_T *session_properties, const HASH_T *proposed_session_properties,
                                             const DIFFUSION_AUTHENTICATOR_T *authenticator);

The authenticator handle can be duplicated with diffusion_authenticator_dup and used in a separate thread. This can be useful if your solution invokes a devolved authentication mechanism, and you want the control client application to continue uninterrupted in another thread.

We are continuing to work hard to bring all the latest Diffusion features into C, enabling you to create the highest performance real-time applications.


Further reading

BLOG

Exploring Generative AI: Opportunity or Potential Headache?

March 25, 2024

Read More about Exploring Generative AI: Opportunity or Potential Headache?/span>

The Diffusion Data logo

BLOG

Creating a WebSocket Server for PubSub

June 28, 2024

Read More about Creating a WebSocket Server for PubSub/span>

The Diffusion Data logo

BLOG

100 million updates per second - Landmark Diffusion cluster performance

July 02, 2024

Read More about 100 million updates per second - Landmark Diffusion cluster performance/span>