Chapter 294: Device Telemetry Systems

Chapter Objectives

By the end of this chapter, you will be able to:

  • Design a scalable and efficient data schema for telemetry messages.
  • Implement on-device buffering and data aggregation to reduce power consumption and data costs.
  • Understand the role of data compression in constrained environments.
  • Build a telemetry system that can prioritize and send critical alerts immediately.
  • Appreciate the importance of accurate time synchronization for telemetry data.
  • Adapt telemetry strategies based on the capabilities of different ESP32 variants (e.g., PSRAM).

Introduction

In the previous chapter, we built a basic remote monitoring system that periodically sent device vital signs to the cloud. While functional for a single device, this simple approach reveals its weaknesses when scaled to a fleet of thousands or even millions of devices. Sending a separate network message for every single metric every few minutes creates a torrent of data that is inefficient, expensive, and power-hungry. The cloud backend groans under the load, and device battery life plummets.

To build a truly professional IoT product, we must evolve from simply sending telemetry to architecting a system for it. This involves thinking critically about what data we send, how we format it, when we send it, and how we can make the entire process as efficient as possible.

This chapter focuses on building these scalable telemetry systems. We will explore advanced techniques like data buffering, aggregation, and schema design that form the bedrock of any large-scale, production-grade IoT deployment.

Theory

A scalable telemetry system is built upon four key pillars: a well-defined data schema, intelligent on-device data handling, robust timekeeping, and a strategy for handling critical events.

1. Data Schema Design

An ad-hoc JSON object works for a prototype, but in production, it leads to chaos. A formal data schema is an explicit contract between the device and the cloud.

  • Consistency: Every device sends data in the exact same structure. This dramatically simplifies cloud-side processing, as the parsing logic is fixed and predictable.
  • Versioning: As you add new features or metrics, you will need to change the schema. A version number in every payload (e.g., "schema_v": 2) allows the cloud to gracefully handle data from devices running different firmware versions.
  • Efficiency: For ultimate efficiency, binary serialization formats like Protocol Buffers (Protobuf) or MessagePack can replace JSON. They create much smaller payloads and are faster to parse, but come at the cost of being less human-readable. For many applications, a well-structured JSON is a good balance.
graph TD
    subgraph "Inefficient Ad-Hoc Approach"
        A[Device 1 sends data]
        B{{"{<br>  'temp': 25.5,<br>  'heap': 35400,<br>  'device': 'sensor-A'<br>}"}}
        C[Device 2 sends data]
        D{{"{<br>  'temperature': 26.1,<br>  'free_heap': 32100,<br>  'rssi': -65<br>}"}}
        E((Cloud Backend))
        A --> B --> E
        C --> D --> E
        F{Parsing Logic?}
        E -- "Inconsistent fields!<br><b>'temp' vs 'temperature'</b><br><b>'heap' vs 'free_heap'</b><br>Missing data!" --> F
    end

    subgraph " "
        style Z fill:none,stroke:none
        Z[" "]
    end

    subgraph "Efficient Schema-Based Approach"
        G[All Devices Send Data]
        H{{"{<br>  'schema_v': 2,<br>  'device_id': 'sensor-A',<br>  'records': [<br>    { 'ts': 1678886400, 'heap': 35400, 'rssi': -62 },<br>    { 'ts': 1678886460, 'heap': 35150, 'rssi': -63 }<br>  ]<br>}"}}
        I((Cloud Backend))
        J[Stable Parsing Logic]
        G --> H --> I --> J
    end

    classDef default fill:#DBEAFE,stroke:#2563EB,stroke-width:1px,color:#1E40AF
    classDef check fill:#FEE2E2,stroke:#DC2626,stroke-width:2px,color:#991B1B
    classDef good fill:#D1FAE5,stroke:#059669,stroke-width:2px,color:#065F46
    class A,C,G,E,I,J default
    class B,D,H default
    class F check
    class J good

2. On-Device Data Handling: Buffering and Aggregation

The single most important optimization is to stop sending data immediately. Instead, the device should collect metrics locally and send them in larger, less frequent batches.

  • Buffering: Instead of publishing a metric, the device stores it in a local buffer, typically an array in RAM.
  • Aggregation: This is the process of sending the entire buffer as a single payload. For example, instead of sending 60 messages over an hour, the device can collect a metric every minute and send a single message containing an array of 60 data points at the end of the hour.
  • Triggering Policies: When should the buffer be sent?
    • Time-based: Publish every N minutes (e.g., every 30 minutes). This is the most common approach.
    • Size-based: Publish when the buffer reaches a certain size (e.g., contains 100 records).
    • Hybrid: Publish every N minutes OR when the buffer is full, whichever comes first.

The benefits are enormous:

  • Power Savings: The radio is a device’s most power-hungry component. Powering it up once an hour instead of every minute dramatically extends battery life.
  • Cost Reduction: For devices on cellular plans, data costs are directly tied to the number of transmissions and total bytes sent. Aggregation drastically reduces both.
  • Reduced Cloud Ingestion: The cloud backend processes fewer, larger messages, which is often more efficient and cheaper.
Policy Description Pros Cons
Time-based Publish the buffer at a fixed interval (e.g., every 30 minutes). – Predictable load on the backend.
– Simple to implement.
– Guarantees data freshness up to the interval period.
– Can be inefficient if little data is collected during an interval.
– May send nearly empty payloads, wasting energy.
Size-based Publish the buffer only when it reaches a certain number of records. – Maximizes data-per-transmission efficiency.
– Ideal for minimizing connection costs.
– Reduces the number of network connections.
– Data can become very stale if collection rate is low.
– Unpredictable publish times for the backend.
Hybrid Publish every N minutes OR when the buffer is full, whichever comes first. – Best of both worlds: ensures data freshness while aiming for efficiency.
– Prevents data from becoming too stale.
– Avoids sending tiny payloads.
– Slightly more complex logic to implement and test.

3. The Importance of Time

When you buffer data, the timestamp becomes critical. A message containing data from the last hour needs to specify exactly when each data point was collected.

  • Time Synchronization: The device must have accurate time. The SNTP (Simple Network Time Protocol) client in ESP-IDF is used to sync the device’s internal clock with a network time server. This should be done shortly after connecting to the network.
  • Timestamps: Every record in your telemetry buffer must include a timestamp, typically in UTC Unix format. Relying on the “server received time” is not sufficient, as it doesn’t account for how long the data was buffered on the device.

4. Prioritizing Critical Events

Not all data is created equal. While regular heap size readings can be buffered, a critical error like a flash write failure or a dangerous temperature reading cannot wait for the next aggregation interval.

A robust system has a “fast lane” for priority events. When a critical event occurs, the system should:

  1. Immediately package the event into a high-priority message.
  2. Send this message right away, bypassing the regular buffer.
  3. Optionally, the system might also decide to flush the regular telemetry buffer at the same time.

Practical Example: Buffered Telemetry with Priority Events

Let’s build a more advanced telemetry system that incorporates buffering and a priority alert for low memory.

1. System Design

  • We will collect heap and RSSI metrics every 30 seconds.
  • These metrics will be stored in a buffer that can hold 10 records.
  • The buffer will be published every 5 minutes OR when it becomes full.
  • If the minimum free heap drops below a critical threshold (e.g., 20KB), a priority alert will be sent immediately.

2. The Code

flowchart TD
    subgraph "Initialization"
        A[Start Task] --> B{Sync Time via SNTP};
        B -- Success --> C[Start 5-min Publish Timer];
    end

    subgraph "Main Loop"
        C --> D{Wait for 30s<br>Collection Interval};
        D --> E{Check Priority Conditions<br>e.g., min_heap < 20KB?};
        E -- Yes --> F[Send Priority Alert<br>IMMEDIATELY];
        F --> G{"Buffer has space?<br>(count < 10)"};
        E -- No --> G;
        
        G -- Yes --> H["Collect Metrics<br> (heap, RSSI)"];
        H --> I[Add Record to Buffer];
        I --> J[Increment buffer_count];
        J --> K{"Buffer is now full?<br>(count >= 10)"};

        G -- No --> K;
        
        K -- Yes --> L[Publish Entire Buffer];
        L --> M[Reset buffer_count = 0];
        M --> N[Reset 5-min Publish Timer];
        N --> D;

        K -- No --> D;
    end

    subgraph "Periodic Trigger"
        P(5-min Timer Expires) --> L;
    end

    classDef startNode fill:#EDE9FE,stroke:#5B21B6,stroke-width:2px,color:#5B21B6
    classDef processNode fill:#DBEAFE,stroke:#2563EB,stroke-width:1px,color:#1E40AF
    classDef decisionNode fill:#FEF3C7,stroke:#D97706,stroke-width:1px,color:#92400E
    classDef alertNode fill:#FEE2E2,stroke:#DC2626,stroke-width:2px,color:#991B1B
    classDef publishNode fill:#D1FAE5,stroke:#059669,stroke-width:2px,color:#065F46

    class A,B,C,P startNode
    class D,H,I,J,M,N processNode
    class E,G,K decisionNode
    class F alertNode
    class L publishNode

This example assumes you have a cJSON component and a working, connected mqtt_client.

C
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "esp_system.h"
#include "esp_log.h"
#include "esp_wifi.h"
#include "esp_heap_caps.h"
#include "esp_sntp.h"
#include "cJSON.h"
#include "esp_mqtt_client.h"

// Assume mqtt_client is initialized and connected elsewhere
extern esp_mqtt_client_handle_t mqtt_client;
extern char device_id[]; // A string holding the unique device ID

static const char *TAG = "TELEMETRY_SYS";

// --- Configuration ---
#define SCHEMA_VERSION 1
#define BUFFER_SIZE 10
#define METRIC_COLLECTION_INTERVAL_S 30
#define BUFFER_PUBLISH_INTERVAL_S 300 // 5 minutes
#define CRITICAL_HEAP_THRESHOLD_BYTES (20 * 1024)

// --- Data Structures ---
typedef struct {
    time_t timestamp;
    size_t free_heap;
    int8_t wifi_rssi;
} metric_record_t;

// --- Global State ---
static metric_record_t telemetry_buffer[BUFFER_SIZE];
static int buffer_count = 0;
static TimerHandle_t publish_timer;

// Forward declarations
void publish_buffer(void);

// --- Time Sync ---
void time_sync_notification_cb(struct timeval *tv) {
    ESP_LOGI(TAG, "Time successfully synchronized");
}

void initialize_sntp(void) {
    ESP_LOGI(TAG, "Initializing SNTP");
    esp_sntp_setoperatingmode(SNTP_OPMODE_POLL);
    esp_sntp_setservername(0, "pool.ntp.org");
    esp_sntp_set_time_sync_notification_cb(time_sync_notification_cb);
    esp_sntp_init();
}

// --- Publishing Logic ---
void publish_payload(const char* topic, const char* payload) {
    if (payload) {
        esp_mqtt_client_publish(mqtt_client, topic, payload, 0, 1, 0);
        ESP_LOGI(TAG, "Published to %s: %s", topic, payload);
    }
}

void publish_priority_alert(const char* alert_message) {
    char topic[128];
    snprintf(topic, sizeof(topic), "devices/%s/alerts", device_id);

    cJSON *root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "timestamp", time(NULL));
    cJSON_AddStringToObject(root, "message", alert_message);
    cJSON_AddNumberToObject(root, "min_free_heap", heap_caps_get_minimum_free_size(MALLOC_CAP_DEFAULT));

    char* payload = cJSON_PrintUnformatted(root);
    publish_payload(topic, payload);
    free(payload);
    cJSON_Delete(root);
}

void publish_buffer(void) {
    if (buffer_count == 0) {
        return; // Nothing to send
    }

    char topic[128];
    snprintf(topic, sizeof(topic), "devices/%s/telemetry", device_id);

    cJSON *root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "schema_v", SCHEMA_VERSION);
    cJSON_AddStringToObject(root, "device_id", device_id);
    cJSON *records = cJSON_AddArrayToObject(root, "records");

    for (int i = 0; i < buffer_count; i++) {
        cJSON *record_obj = cJSON_CreateObject();
        cJSON_AddNumberToObject(record_obj, "ts", telemetry_buffer[i].timestamp);
        cJSON_AddNumberToObject(record_obj, "heap", telemetry_buffer[i].free_heap);
        cJSON_AddNumberToObject(record_obj, "rssi", telemetry_buffer[i].wifi_rssi);
        cJSON_AddItemToArray(records, record_obj);
    }

    char* payload = cJSON_PrintUnformatted(root);
    publish_payload(topic, payload);
    free(payload);
    cJSON_Delete(root);
    
    // Reset buffer
    buffer_count = 0;
}

// --- Timer Callbacks ---
void publish_timer_callback(TimerHandle_t xTimer) {
    ESP_LOGI(TAG, "Publish timer expired. Publishing buffer.");
    publish_buffer();
}

void metrics_collection_task(void *pvParameters) {
    initialize_sntp(); // Start time sync process

    // Wait for time to be set
    time_t now = 0;
    while (time(&now) < 1000) {
        ESP_LOGI(TAG, "Waiting for time sync...");
        vTaskDelay(pdMS_TO_TICKS(2000));
    }

    publish_timer = xTimerCreate("publish_timer", pdMS_TO_TICKS(BUFFER_PUBLISH_INTERVAL_S * 1000),
                                 pdTRUE, // Auto-reload
                                 0, publish_timer_callback);
    xTimerStart(publish_timer, 0);

    while (1) {
        vTaskDelay(pdMS_TO_TICKS(METRIC_COLLECTION_INTERVAL_S * 1000));

        // 1. Check for priority conditions
        size_t min_heap = heap_caps_get_minimum_free_size(MALLOC_CAP_DEFAULT);
        if (min_heap < CRITICAL_HEAP_THRESHOLD_BYTES) {
            publish_priority_alert("CRITICAL: Minimum free heap is dangerously low!");
        }

        // 2. Collect regular metrics if buffer has space
        if (buffer_count < BUFFER_SIZE) {
            metric_record_t *record = &telemetry_buffer[buffer_count];
            record->timestamp = time(NULL);
            record->free_heap = heap_caps_get_free_size(MALLOC_CAP_DEFAULT);
            
            wifi_ap_record_t ap_info;
            record->wifi_rssi = (esp_wifi_sta_get_ap_info(&ap_info) == ESP_OK) ? ap_info.rssi : 0;
            
            ESP_LOGI(TAG, "Buffered record #%d", buffer_count + 1);
            buffer_count++;
        }

        // 3. Check if buffer is full and needs publishing
        if (buffer_count >= BUFFER_SIZE) {
            ESP_LOGI(TAG, "Buffer full. Publishing now.");
            publish_buffer();
            // Reset and restart the periodic publish timer to avoid a quick successive publish
            xTimerReset(publish_timer, 0);
        }
    }
}

3. Build and Run

  1. Ensure SNTP is enabled in menuconfig.
  2. Add the metrics_collection_task to your app_main.
  3. Flash and run the application.
  4. Observe:
    • The device will first wait to sync its time.
    • Every 30 seconds, it will log “Buffered record #N”.
    • Every 5 minutes (or when 10 records are buffered), it will publish a large JSON array to the .../telemetry topic.
    • To test the alert, create a task that temporarily allocates a large chunk of memory to drive the minimum free heap down. You should see an immediate message on the .../alerts topic.

Variant Notes

  • PSRAM is a Game-Changer: On variants with PSRAM (ESP32, ESP32-S3), you can create vastly larger telemetry buffers. Instead of holding just 10 records, you could buffer thousands. This allows the device to stay offline for much longer periods, waking the radio only once every few hours or even once a day, which is transformative for battery-powered applications. You would allocate the buffer in PSRAM using heap_caps_malloc(..., MALLOC_CAP_SPIRAM).

  • Data Compression: For variants with strong computational power (ESP32-S3, ESP32-C6) and large telemetry payloads, on-device compression becomes viable. You can integrate a library like heatshrink or zlib (available as an ESP-IDF component) to compress the serialized JSON string before publishing it to MQTT. This can reduce payload size by 70-90% but consumes CPU and memory.
  • Hardware Crypto Accelerators: All modern variants (S2, S3, C-series, H-series) have hardware acceleration for the TLS handshake required by a secure MQTT connection. This reduces the energy and time cost of connecting to the network to publish the telemetry buffer, making the aggregation strategy even more effective.

Common Mistakes & Troubleshooting Tips

Mistake / Issue Symptom(s) Troubleshooting / Solution
Forgetting Time Sync Timestamps in telemetry data are zero or show dates like 1970-01-01. Data is useless for time-series analysis. Solution: Always initialize SNTP early in your application startup. Use a loop to wait until the system time is valid before starting the telemetry collection task.

initialize_sntp();
while (time(NULL) < 1000) { vTaskDelay(1000); }
Incorrect Buffer Logic Data is lost, sent multiple times, or the device crashes with an out-of-bounds error. The buffer never seems to empty or fills instantly. Common Error: Forgetting to reset buffer_count = 0; after a successful publish.

Solution: Double-check all boundary conditions. Ensure the buffer index is correctly incremented and reset. Add extensive logging to trace the buffer’s state.
“Thundering Herd” Your cloud backend or database becomes unresponsive at specific times (e.g., on the hour). Network performance drops for all devices simultaneously. Solution: Introduce jitter. When starting the publish timer, add a small, random delay unique to each device. This spreads the network load over time.

uint32_t random_delay_ms = esp_random() % 60000; // 0-60s
vTaskDelay(pdMS_TO_TICKS(random_delay_ms));
xTimerStart(publish_timer, 0);
Ignoring Cellular Data Costs The monthly bill for your IoT fleet’s SIM cards is unexpectedly and astronomically high. Solution: Be extremely aggressive with data optimization.
1. Use large buffers to send data infrequently.
2. Compress payloads with libraries like zlib or heatshrink.
3. Consider binary formats like Protobuf instead of JSON.

Exercises

  1. Implement Size-Based Trigger: Modify the example code. Remove the periodic publish timer (publish_timer). Instead, make the system publish the buffer only when buffer_count reaches BUFFER_SIZE.
  2. Stateful Buffer on Reboot: The current example loses its buffer on reboot. Modify it to save the telemetry_buffer and buffer_count to NVS before a planned shutdown. On startup, it should check NVS and restore the buffer, allowing it to continue collecting data without loss. (Note: This is for planned reboots; crash reboots would still lose data in RAM).
  3. Dynamic Telemetry Interval: Implement a remote diagnostic command. The device should listen on an MQTT topic for a command to change its metric collection interval. For example, receiving {"collection_interval_s": 10} on a topic should cause the device to change its METRIC_COLLECTION_INTERVAL_S and reconfigure its collection logic without needing a reboot.

Summary

  • Transitioning from basic monitoring to a scalable telemetry system is crucial for production IoT devices.
  • A versioned Data Schema provides a stable contract between the device and the cloud.
  • Buffering and Aggregation are the most effective techniques for reducing power consumption and data costs by sending fewer, larger messages.
  • Accurate Time Synchronization via SNTP is mandatory for buffered data to be meaningful.
  • A robust system must have a priority channel to send critical alerts immediately, bypassing the normal buffer.
  • Hardware features like PSRAM and crypto accelerators on different ESP32 variants can be leveraged to create even more efficient telemetry strategies.

Further Reading

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top