Data Aggregation
Overview
The data aggregation module provides a flexible mechanism for monitoring,
collecting, and publishing updates to a collection of data or struct members.
It is designed to track changes of individual data items, aggregate updates, and
trigger publishing actions based on configurable timing and update policies.
This is particularly useful for aggregating data scattered from various sources,
such as sensor data from ADC and CAN bus, which must be collected and sent at
regular intervals or upon change. This module can be added to any application by
enabling the CONFIG_NTURT_MSG
Kconfig option.
Core concepts
Timing Parameters
There are three timing parameters that control the behavior of the aggregation:
Period: The typical interval at which the aggregated data is published.
Minimum Separation: The minimum time between two consecutive publications.
Watermark: The additional time to wait for late-arriving data.
Timing parameters for data aggregation.
Basically, only after every data item in an aggregation are updated will the aggregated data be published. However, if after Period time has elapsed since the last publication and some data items are still not updated, additional Watermark time will be waited for late-arriving data. If after that time some data items are still not updated, the aggregated data will be published anyway, using the last known values of those data items.
If the updates of specific data items are not necessary,
AGG_MEMBER_FLAG_IGNORED
or AGG_MEMBER_FLAG_OPTIONAL
flags
can be set in AGG_MEMBER
to mark a member as ignored or optional.
Since the data aggregation module is designed to be used for processing real-time data, aggregated data is published immediately after all data items are updated. And Minimum Separation is used to ensure that the publication does not happen too frequently. Hence the aggregated data can only be published in the Publish Interval.
Dormant and Cold Start
If after Period plus Watermark time has elapsed and no data items were updated, the aggregation will be dormant, meaning it will stop publishing until any one of its data items is updated again. This is useful to stop unnecessary publication when the source modules are not active or the data is not changing.
After the aggregation is cold started by an update of a data item (including
data marked by AGG_MEMBER_FLAG_OPTIONAL
), it will only wait
Watermark time for other data items to be updated before publishing.
Dormant can be turned off by setting AGG_FLAG_ALWAYS_PUBLISH
flag in
AGG_DEFINE
or AGG_TYPED_DEFINE
, which will force the
aggregation to publish every Period plus Watermark even if no data items
are updated.
Note
Currently, the aggregation module does not start automatically even if
AGG_FLAG_ALWAYS_PUBLISH
is set. It will only start when
the first data item is updated. This may be changed in the future to
automatically start the aggregation after initialization.
Data Update
If one data item is updated multiple times before next publication, only the latest value will be published. This is to ensure that the aggregated data reflects the most recent state of the system. If a data item is not updated before the next publication, its last known value will be used.
Runtime Behavior
Execution Contex: Publishing the aggregated data is done in Zephyr
System Work Queue
, which is a high-priority thread designed for bottom- half of interrupt handlers. Hence the publish function should be non-blocking and fast to avoid blocking the system.
Usage
Defining an Aggregation
The aggregation module can be used in two main ways:
For external or unrelated data items, use the
agg
and define it usingAGG_DEFINE
or initialize it withAGG_INITIALIZER
within a struct. This allows you to aggregate updates to data items by their index, suitable for cases where the data items are not part of a single struct.For struct members, use
AGG_TYPED_DEFINE
to define a typed aggregation. This macro sets up an aggregation for a specific struct type, allowing you to monitor and update individual members. This is useful for aggregating updates to fields within a message in message types or data structure.
Suppose we have a struct representing a message:
struct my_msg {
int foo;
struct {
float x;
float y;
} bar;
};
An aggregation can be defined to monitor updates to these members:
AGG_TYPED_DEFINE(my_msg_agg, struct my_msg,
AGG_DATA_INIT({0, {0.0f, 0.0f}}), // initial value
K_MSEC(100), // period
K_MSEC(10), // minimum separation
K_MSEC(20), // watermark
0, // aggregation flags
my_publish_func, // publish callback
NULL, // user data for the callback
AGG_MEMBER(foo), // members to monitor
AGG_MEMBER(bar.x, AGG_MEMBER_FLAG_OPTIONAL)
);
Note
Not all members need to be monitored. But if a member is not monitored, only the initial value will be used when the aggregation is published.
Updating Members
To signal that a member has been updated, use agg_update()
for
agg
or AGG_TYPED_UPDATE
for typed aggregations:
AGG_TYPED_UPDATE(&my_msg_agg, struct my_msg, foo, 42);
AGG_TYPED_UPDATE(&my_msg_agg, struct my_msg, bar.x, 3.14F);
Warning
Only members declared in the AGG_TYPED_DEFINE
can be updated. If
a unknown member is updated, an assertion will fail at runtime.
Publish Function
The aggregation module publishes the aggregated data by calling a user-defined
publish function of type agg_publish_t
for agg
or
agg_typed_publish_t
for typed aggregations:
void my_publish(const void *data, void *user_data) {
const struct my_msg *agg = data;
// publish the aggregated data
}
API Reference
- group Aggregation
Data aggregation.
Defines
-
AGG_FLAG_ALWAYS_PUBLISH
Flag indicating the aggregation will always publish the data, even if no members are updated.
-
AGG_MEMBER_FLAG_IGNORED
Flag indicating the aggregation will not monitor the update of the member.
-
AGG_MEMBER_FLAG_OPTIONAL
Flag indicating the aggregation will not wait for the member to be updated before publishing. However, updates to the member will cold start the aggregation from dormant.
-
AGG_INITIALIZER(_obj, _name, _period, _min_separation, _watermark, _flag, _publish, _user_data, ...)
Static initializer for a dataa aggregation. Refer to AGG_DEFINE for detailed parameter descriptions.
- Parameters:
_obj – [in] Object to be initialized.
-
AGG_DEFINE(name, period, min_separation, watermark, flag, publish, user_data, ...)
Define a data aggregation named
_name
to monitor the update of data. May be specified asstatic
to limit the scope of the aggregation.- Parameters:
name – [in] Name of the aggregation.
period – [in] Period of data publishing.
min_separation – [in] Minimum separation time between two data publishing.
watermark – [in] Watermark to wait for late-arriving members.
flag – [in] Flag of the aggregation. If no flag is required, 0 should be specified, and multiple flags can be combined by using the bitwise OR operator (|).
publish – [in] Function to publish the data, must be of type agg_publish_t.
user_data – [in] Pointer to custom data for the callback.
... – [in] Flags of the data to be monitored, where each flag represents a data to be monitored. If the data does not require flag, 0 should be specified. The flags can be combined by using the bitwise OR operator (|).
-
AGG_MEMBER(member, ...)
Specify a member of a struct to be monitored for aggregation. Used in AGG_TYPED_DEFINE.
- Parameters:
member – [in] Member of the struct to be monitored.
... – [in] Optional flags of the member, multiple flags can be specified by using the bitwise OR operator (|).
-
AGG_DATA_INIT(val, ...)
Intial value of the data. Used in AGG_TYPED_DEFINE.
- Parameters:
val – [in] Initialization list of the data.
- Returns:
Initial value of the data.
-
AGG_TYPED_DEFINE(_name, _type, _init_val, _period, _min_separation, _watermark, _flag, _publish, _user_data, ...)
Define a data aggregation named
_name
to monitor the update of members within a data type. May be specified asstatic
to limit the scope of the aggregation.- Parameters:
_name – [in] Name of the aggregation.
_type – [in] Data type to be monitored.
_init_val – [in] Initial value of the data, must be a specified by AGG_DATA_INIT.
_period – [in] Period of data publishing.
_min_separation – [in] Minimum separation time between two data publishing.
_watermark – [in] Watermark to wait for late-arriving members.
_flag – [in] Flag of the aggregation. If no flag is required, 0
_publish – [in] Function to publish the data, must be of type agg_typed_publish_t.
_user_data – [in] Pointer to custom data for the callback.
... – [in] Members of
_type
to be monitored, must be specified by AGG_MEMBER.
-
AGG_TYPED_UPDATE(agg_typed, type, member, value)
Update a member of data.
- Parameters:
agg_typed – Pointer to the data aggregation.
type – [in] Type of the data, must be the same as the type specified in AGG_TYPED_DEFINE.
member – [in] Member of the data to be updated, must be listed in AGG_TYPED_DEFINE.
value – [in] New value of the member.
Typedefs
-
typedef void (*agg_publish_t)(struct agg *agg, void *user_data)
Function to publish the data.
- Param agg:
[in] Pointer to the data aggregation.
- Param user_data:
[in] Pointer to custom data for callback functions.
-
typedef void (*agg_typed_publish_t)(const void *data, void *user_data)
Function to publish the data.
- Param data:
[in] Pointer to the data to be published.
- Param user_data:
[in] Pointer to custom data for callback functions.
Functions
-
void agg_update(struct agg *agg, int idx)
Signal the update of a data in aggregation.
- Parameters:
agg – [inout] Pointer to agg.
idx – [in] Index of the data that is updated.
-
void agg_period_timer_cb(struct k_timer *timer)
Timer callback function for periodic publishing.
Warning
Internal use only.
- Parameters:
timer – [inout] Timer.
-
void agg_early_timer_cb(struct k_timer *timer)
Timer callback function for tracking minimum separation time.
Warning
Internal use only.
- Parameters:
timer – [inout] Timer.
-
void agg_work_cb(struct k_work *work)
Work callback function for the bottom half of publishing, also used for late publishing.
Warning
Internal use only.
- Parameters:
work – [inout] Work.
-
struct agg
- #include <nturt/msg/aggregation.h>
Data aggregation.
Public Members
-
const char *name
Name of the aggregation.
-
const int flag
Flag of the aggregation.
-
const size_t num_member
Number of members to be monitored for updating.
-
const uint8_t *const member_flags
Flags of the members.
-
const uint32_t fully_updated
What agg::updated should be when every members are updated.
-
const k_timeout_t period
Period of data publishing.
-
const k_timeout_t min_separation
Minimum separation time between two data publishing.
-
const k_timeout_t watermark
Watermark to wait for late-arriving members.
-
const agg_publish_t publish
Function to publish the data.
-
void *const user_data
User data for the callback.
-
struct k_spinlock lock
Spinlock to protect the following struct members.
-
struct k_timer period_timer
Timer for periodic publishing.
-
struct k_timer early_timer
Timer for tracking minimum separation time.
-
struct k_work_delayable work
Work for the bottom half of publishing, also used for late publishing.
-
uint32_t updated
Which member has been updated, each bit represents one member.
-
const char *name
-
struct agg_typed
- #include <nturt/msg/aggregation.h>
Data aggregation for a specific type.
Public Members
-
const size_t data_size
Size of the data.
-
const agg_typed_publish_t publish
Function to publish the data.
-
struct k_spinlock lock
Spinlock to protect the following struct members.
-
void *const data
Pointer to the buffer for updating.
-
void *const pub_data
Pointer to the buffer for publishing.
-
const size_t data_size
-
AGG_FLAG_ALWAYS_PUBLISH