Data Aggregation

Overview

The data aggregation module provides a flexible mechanism for monitoring, collecting, and publishing updates to a collection of data or struct members. It is designed to track changes of individual data items, aggregate updates, and trigger publishing actions based on configurable timing and update policies. This is particularly useful for aggregating data scattered from various sources, such as sensor data from ADC and CAN bus, which must be collected and sent at regular intervals or upon change. This module can be added to any application by enabling the CONFIG_NTURT_MSG Kconfig option.

Core concepts

Timing Parameters

There are three timing parameters that control the behavior of the aggregation:

  • Period: The typical interval at which the aggregated data is published.

  • Minimum Separation: The minimum time between two consecutive publications.

  • Watermark: The additional time to wait for late-arriving data.

../_images/aggregation_timing.svg

Timing parameters for data aggregation.

Basically, only after every data item in an aggregation are updated will the aggregated data be published. However, if after Period time has elapsed since the last publication and some data items are still not updated, additional Watermark time will be waited for late-arriving data. If after that time some data items are still not updated, the aggregated data will be published anyway, using the last known values of those data items.

If the updates of specific data items are not necessary, AGG_MEMBER_FLAG_IGNORED or AGG_MEMBER_FLAG_OPTIONAL flags can be set in AGG_MEMBER to mark a member as ignored or optional.

Since the data aggregation module is designed to be used for processing real-time data, aggregated data is published immediately after all data items are updated. And Minimum Separation is used to ensure that the publication does not happen too frequently. Hence the aggregated data can only be published in the Publish Interval.

Dormant and Cold Start

If after Period plus Watermark time has elapsed and no data items were updated, the aggregation will be dormant, meaning it will stop publishing until any one of its data items is updated again. This is useful to stop unnecessary publication when the source modules are not active or the data is not changing.

After the aggregation is cold started by an update of a data item (including data marked by AGG_MEMBER_FLAG_OPTIONAL), it will only wait Watermark time for other data items to be updated before publishing.

Dormant can be turned off by setting AGG_FLAG_ALWAYS_PUBLISH flag in AGG_DEFINE or AGG_TYPED_DEFINE, which will force the aggregation to publish every Period plus Watermark even if no data items are updated.

Note

Currently, the aggregation module does not start automatically even if AGG_FLAG_ALWAYS_PUBLISH is set. It will only start when the first data item is updated. This may be changed in the future to automatically start the aggregation after initialization.

Data Update

If one data item is updated multiple times before next publication, only the latest value will be published. This is to ensure that the aggregated data reflects the most recent state of the system. If a data item is not updated before the next publication, its last known value will be used.

Runtime Behavior

  • Execution Contex: Publishing the aggregated data is done in Zephyr System Work Queue, which is a high-priority thread designed for bottom- half of interrupt handlers. Hence the publish function should be non-blocking and fast to avoid blocking the system.

Usage

Defining an Aggregation

The aggregation module can be used in two main ways:

  • For external or unrelated data items, use the agg and define it using AGG_DEFINE or initialize it with AGG_INITIALIZER within a struct. This allows you to aggregate updates to data items by their index, suitable for cases where the data items are not part of a single struct.

  • For struct members, use AGG_TYPED_DEFINE to define a typed aggregation. This macro sets up an aggregation for a specific struct type, allowing you to monitor and update individual members. This is useful for aggregating updates to fields within a message in message types or data structure.

Suppose we have a struct representing a message:

struct my_msg {
    int foo;
    struct {
        float x;
        float y;
    } bar;
};

An aggregation can be defined to monitor updates to these members:

AGG_TYPED_DEFINE(my_msg_agg, struct my_msg,
    AGG_DATA_INIT({0, {0.0f, 0.0f}}),              // initial value
    K_MSEC(100),                                   // period
    K_MSEC(10),                                    // minimum separation
    K_MSEC(20),                                    // watermark
    0,                                             // aggregation flags
    my_publish_func,                               // publish callback
    NULL,                                          // user data for the callback
    AGG_MEMBER(foo),                               // members to monitor
    AGG_MEMBER(bar.x, AGG_MEMBER_FLAG_OPTIONAL)
);

Note

Not all members need to be monitored. But if a member is not monitored, only the initial value will be used when the aggregation is published.

Updating Members

To signal that a member has been updated, use agg_update() for agg or AGG_TYPED_UPDATE for typed aggregations:

AGG_TYPED_UPDATE(&my_msg_agg, struct my_msg, foo, 42);
AGG_TYPED_UPDATE(&my_msg_agg, struct my_msg, bar.x, 3.14F);

Warning

Only members declared in the AGG_TYPED_DEFINE can be updated. If a unknown member is updated, an assertion will fail at runtime.

Publish Function

The aggregation module publishes the aggregated data by calling a user-defined publish function of type agg_publish_t for agg or agg_typed_publish_t for typed aggregations:

void my_publish(const void *data, void *user_data) {
    const struct my_msg *agg = data;

    // publish the aggregated data
}

API Reference

group Aggregation

Data aggregation.

Defines

AGG_FLAG_ALWAYS_PUBLISH

Flag indicating the aggregation will always publish the data, even if no members are updated.

AGG_MEMBER_FLAG_IGNORED

Flag indicating the aggregation will not monitor the update of the member.

AGG_MEMBER_FLAG_OPTIONAL

Flag indicating the aggregation will not wait for the member to be updated before publishing. However, updates to the member will cold start the aggregation from dormant.

AGG_INITIALIZER(_obj, _name, _period, _min_separation, _watermark, _flag, _publish, _user_data, ...)

Static initializer for a dataa aggregation. Refer to AGG_DEFINE for detailed parameter descriptions.

Parameters:
  • _obj[in] Object to be initialized.

AGG_DEFINE(name, period, min_separation, watermark, flag, publish, user_data, ...)

Define a data aggregation named _name to monitor the update of data. May be specified as static to limit the scope of the aggregation.

Parameters:
  • name[in] Name of the aggregation.

  • period[in] Period of data publishing.

  • min_separation[in] Minimum separation time between two data publishing.

  • watermark[in] Watermark to wait for late-arriving members.

  • flag[in] Flag of the aggregation. If no flag is required, 0 should be specified, and multiple flags can be combined by using the bitwise OR operator (|).

  • publish[in] Function to publish the data, must be of type agg_publish_t.

  • user_data[in] Pointer to custom data for the callback.

  • ...[in] Flags of the data to be monitored, where each flag represents a data to be monitored. If the data does not require flag, 0 should be specified. The flags can be combined by using the bitwise OR operator (|).

AGG_MEMBER(member, ...)

Specify a member of a struct to be monitored for aggregation. Used in AGG_TYPED_DEFINE.

Parameters:
  • member[in] Member of the struct to be monitored.

  • ...[in] Optional flags of the member, multiple flags can be specified by using the bitwise OR operator (|).

AGG_DATA_INIT(val, ...)

Intial value of the data. Used in AGG_TYPED_DEFINE.

Parameters:
  • val[in] Initialization list of the data.

Returns:

Initial value of the data.

AGG_TYPED_DEFINE(_name, _type, _init_val, _period, _min_separation, _watermark, _flag, _publish, _user_data, ...)

Define a data aggregation named _name to monitor the update of members within a data type. May be specified as static to limit the scope of the aggregation.

Parameters:
  • _name[in] Name of the aggregation.

  • _type[in] Data type to be monitored.

  • _init_val[in] Initial value of the data, must be a specified by AGG_DATA_INIT.

  • _period[in] Period of data publishing.

  • _min_separation[in] Minimum separation time between two data publishing.

  • _watermark[in] Watermark to wait for late-arriving members.

  • _flag[in] Flag of the aggregation. If no flag is required, 0

  • _publish[in] Function to publish the data, must be of type agg_typed_publish_t.

  • _user_data[in] Pointer to custom data for the callback.

  • ...[in] Members of _type to be monitored, must be specified by AGG_MEMBER.

AGG_TYPED_UPDATE(agg_typed, type, member, value)

Update a member of data.

Parameters:
  • agg_typed – Pointer to the data aggregation.

  • type[in] Type of the data, must be the same as the type specified in AGG_TYPED_DEFINE.

  • member[in] Member of the data to be updated, must be listed in AGG_TYPED_DEFINE.

  • value[in] New value of the member.

Typedefs

typedef void (*agg_publish_t)(struct agg *agg, void *user_data)

Function to publish the data.

Param agg:

[in] Pointer to the data aggregation.

Param user_data:

[in] Pointer to custom data for callback functions.

typedef void (*agg_typed_publish_t)(const void *data, void *user_data)

Function to publish the data.

Param data:

[in] Pointer to the data to be published.

Param user_data:

[in] Pointer to custom data for callback functions.

Functions

void agg_update(struct agg *agg, int idx)

Signal the update of a data in aggregation.

Parameters:
  • agg[inout] Pointer to agg.

  • idx[in] Index of the data that is updated.

void agg_period_timer_cb(struct k_timer *timer)

Timer callback function for periodic publishing.

Warning

Internal use only.

Parameters:
  • timer[inout] Timer.

void agg_early_timer_cb(struct k_timer *timer)

Timer callback function for tracking minimum separation time.

Warning

Internal use only.

Parameters:
  • timer[inout] Timer.

void agg_work_cb(struct k_work *work)

Work callback function for the bottom half of publishing, also used for late publishing.

Warning

Internal use only.

Parameters:
  • work[inout] Work.

void agg_typed_publish(struct agg *agg, void *user_data)

Publish function for data aggregation.

Warning

Internal use only.

struct agg
#include <nturt/msg/aggregation.h>

Data aggregation.

Public Members

const char *name

Name of the aggregation.

const int flag

Flag of the aggregation.

const size_t num_member

Number of members to be monitored for updating.

const uint8_t *const member_flags

Flags of the members.

const uint32_t fully_updated

What agg::updated should be when every members are updated.

const k_timeout_t period

Period of data publishing.

const k_timeout_t min_separation

Minimum separation time between two data publishing.

const k_timeout_t watermark

Watermark to wait for late-arriving members.

const agg_publish_t publish

Function to publish the data.

void *const user_data

User data for the callback.

struct k_spinlock lock

Spinlock to protect the following struct members.

struct k_timer period_timer

Timer for periodic publishing.

struct k_timer early_timer

Timer for tracking minimum separation time.

struct k_work_delayable work

Work for the bottom half of publishing, also used for late publishing.

uint32_t updated

Which member has been updated, each bit represents one member.

struct agg_typed
#include <nturt/msg/aggregation.h>

Data aggregation for a specific type.

Public Members

const size_t data_size

Size of the data.

const uint8_t *const map

Map from the offset of member in the type to the index of data in agg.

const agg_typed_publish_t publish

Function to publish the data.

struct k_spinlock lock

Spinlock to protect the following struct members.

struct agg agg

Data aggregation.

void *const data

Pointer to the buffer for updating.

void *const pub_data

Pointer to the buffer for publishing.