Chapter 110: MQTT Advanced Message Patterns
Chapter Objectives
By the end of this chapter, you will be able to:
- Implement the Request/Response messaging pattern over MQTT using dedicated response topics and correlation IDs.
- Understand and design systems using Fan-out and Fan-in patterns for one-to-many and many-to-one message distribution.
- Incorporate message sequencing and correlation IDs to manage complex or multi-part message workflows.
- Describe the concept and benefits of Shared Subscriptions (an MQTT v5.0 feature) and its implications for client-side design.
- Design strategies for handling errors and reporting status in asynchronous MQTT communications.
- Apply these advanced patterns to build more robust and interactive ESP32 applications.
Introduction
So far in our MQTT journey, we’ve mastered the basics: publishing telemetry, subscribing to commands, ensuring message delivery with QoS, securing connections, and designing effective topic structures. These foundational skills enable a wide range of IoT applications. However, as systems grow in complexity, we often encounter scenarios requiring more sophisticated interaction models than simple fire-and-forget publishing or basic command reception.
How does a device request specific data from a server and get a direct reply? How can a command be broadcast to many devices simultaneously for coordinated action? How can multiple sensors report to a single processing unit? How do we ensure a sequence of messages arrives correctly or correlate a response to its original request in an asynchronous world?
This chapter delves into “Advanced Message Patterns” that address these needs. We’ll explore techniques like Request/Response, Fan-out/Fan-in, message sequencing, and error handling, all built upon the core MQTT publish-subscribe mechanism. Understanding these patterns will allow you to design more interactive, resilient, and intelligent IoT systems with your ESP32 devices.
Theory
While MQTT is fundamentally a publish-subscribe protocol, which is inherently asynchronous and decoupled, several well-established patterns can be implemented on top of it to achieve more complex communication workflows.
1. Request/Response Pattern
The most common “missing” piece for developers coming from synchronous protocols like HTTP is a direct request/reply mechanism. MQTT can simulate this effectively.
Concept:
A client (e.g., an ESP32) needs a specific piece of information from a server or wants to trigger an action and receive a direct confirmation or result.
- Request: The client publishes a request message to a predefined request topic. This message typically includes:
- The actual request data/command.
- A Response Topic: A unique topic string where the client expects the server to send the reply. This is often specific to the client or even the request instance.
- A Correlation ID: A unique identifier (e.g., a UUID, a timestamp, or a sequence number) generated by the client. This ID is included in both the request and the subsequent response, allowing the client to match incoming responses to its original requests, especially if it has multiple pending requests.
- Processing: A server application is subscribed to the request topic. Upon receiving a request, it processes it.
- Response: The server publishes the response message to the
Response Topic
specified in the request message. This response message must include the sameCorrelation ID
from the request.
Topic Structure Example:
- Client Device ID:
esp32_sensor_A01
- Request Topic (client publishes to):
devices/esp32_sensor_A01/commands/get_config/request
- Payload:
{"correlation_id": "req123", "response_topic": "devices/esp32_sensor_A01/commands/get_config/response/req123"}
- Payload:
- Response Topic (client subscribes to, server publishes to):
devices/esp32_sensor_A01/commands/get_config/response/req123
- Payload:
{"correlation_id": "req123", "config_data": {"param1": "value1"}, "status": "success"}
- Payload:
Element | Description | Importance | Example Value / Structure |
---|---|---|---|
Request Topic | A well-defined topic where the server listens for incoming requests. | Standardizes where clients send requests for a particular service. | server/actions/calculate_sum/request |
Response Topic (in Request) | A topic string, included in the request message payload, where the client expects the server to publish the response. Often dynamically generated by the client. | Directs the server where to send the specific reply. Enables client-specific or request-specific response channels. | clients/esp32_XYZ/rpc_response/corr789 |
Correlation ID (in Request & Response) | A unique identifier generated by the client, included in both the request and response message payloads. | Essential for the client to match an incoming asynchronous response to its original outgoing request, especially with multiple pending requests. | "corr_id": "uuid_123e4567-e89b-12d3-a456-426614174000" |
Request Payload | The actual data or command parameters for the request, along with the response topic and correlation ID. | Carries the necessary information for the server to process the request. | {"correlation_id": "c1", "response_topic": "cl/res/c1", "operands": [10, 5]} |
Response Payload | The result of the server’s processing, including the original correlation ID and a status indicator. | Delivers the outcome back to the client and allows for status checking. | {"correlation_id": "c1", "result": 15, "status": "success"} |
Client-Side Subscription | The client must subscribe to the unique response topic it specified (or a pattern matching it) *before* or immediately after sending the request. | Necessary to receive the server’s reply. | Client subscribes to clients/esp32_XYZ/rpc_response/corr789 |
Timeout Handling (Client-Side) | The requesting client must implement a timeout mechanism to handle cases where a response is not received within an expected timeframe. | Prevents indefinite blocking and allows for error recovery or alternative actions if the server is unresponsive or the message is lost. | Client waits for 10 seconds; if no response, logs error and proceeds. |
Server-Side Logic | The server application subscribes to the request topic, extracts response topic and correlation ID, processes the request, and publishes the response. | Implements the “other half” of the pattern. | Server listens on server/actions/calculate_sum/request . |
Key Elements:
- Dedicated Response Topics: Clients often subscribe to a unique response topic, possibly incorporating their client ID and the correlation ID to ensure they only receive their own responses. Alternatively, a more general response topic can be used if the client filters responses by correlation ID in the payload.
- Correlation ID: Essential for matching asynchronous responses to requests.
- Timeout Handling: The requesting client must implement a timeout mechanism. If a response isn’t received within a certain period, it should assume the request failed or was lost.
sequenceDiagram actor ESP32_Client as ESP32 Client participant Server_App as Server Application participant MQTT_Broker as MQTT Broker ESP32_Client->>ESP32_Client: 1. Generate Correlation ID (e.g., "xyz789") ESP32_Client->>ESP32_Client: 2. Define Response Topic (e.g., "client/esp32_A/response/xyz789") ESP32_Client->>MQTT_Broker: 3. Subscribe to "client/esp32_A/response/xyz789" MQTT_Broker-->>ESP32_Client: 4. SUBACK ESP32_Client->>MQTT_Broker: 5. Publish Request to "server/commands/fetch_data"<br>Payload: {"corr_id": "xyz789",<br> "resp_topic": "client/esp32_A/response/xyz789",<br> "params": ...} MQTT_Broker->>Server_App: 6. Deliver Request Message Server_App->>Server_App: 7. Process Request (extracts corr_id, resp_topic) Server_App->>MQTT_Broker: 8. Publish Response to "client/esp32_A/response/xyz789"<br>Payload: {"corr_id": "xyz789",<br> "data": ..., "status": "ok"} MQTT_Broker->>ESP32_Client: 9. Deliver Response Message ESP32_Client->>ESP32_Client: 10. Receive Message, Check corr_id ("xyz789") ESP32_Client->>ESP32_Client: 11. Process Response Data
2. Fan-out Pattern (One-to-Many)
Concept:
A single publisher sends a message that needs to be received and processed by multiple, potentially many, subscribers simultaneously. This is a natural fit for MQTT’s pub/sub model.
Use Cases:
- Broadcasting a command to all devices of a certain type (e.g., “all lights in building A, turn off”).
- Distributing configuration updates to a group of devices.
- Sending notifications or alerts to multiple monitoring applications.
Implementation:
- A publisher sends a message to a specific topic (e.g.,
building_A/lights/command/set_state
). - Multiple subscriber clients (e.g., individual light controllers) are all subscribed to this topic.
- The MQTT broker delivers the message to all subscribed clients.
graph TD subgraph Publisher_Side [Single Publisher] P1["<b>Central Controller / App</b><br>Publishes to Topic:<br><i>building_A/lights/command/set_state</i><br>Payload: {\state\: \off\}"] end subgraph MQTT_Broker_FanOut [MQTT Broker] B["<b>MQTT Broker</b>"] end subgraph Subscribers_Side [Multiple Subscribers] S1["<b>Light Controller 1 (ESP32)</b><br>Subscribed to:<br><i>building_A/lights/command/set_state</i>"] S2["<b>Light Controller 2 (ESP32)</b><br>Subscribed to:<br><i>building_A/lights/command/set_state</i>"] S3["<b>Light Controller N (ESP32)</b><br>Subscribed to:<br><i>building_A/lights/command/set_state</i>"] S4["... (Other subscribers to the same topic) ..."] end P1 -- Message --> B; B -- Delivers Copy 1 --> S1; B -- Delivers Copy 2 --> S2; B -- Delivers Copy N --> S3; B -- Delivers Copy ... --> S4; S1 -- Acts on Command --> R1["Light 1 Turns Off"]; S2 -- Acts on Command --> R2["Light 2 Turns Off"]; S3 -- Acts on Command --> R3["Light N Turns Off"]; classDef default fill:#f9f9f9,stroke:#333,stroke-width:1px,font-family:'Open Sans',color:#333; classDef primaryNode fill:#EDE9FE,stroke:#5B21B6,stroke-width:2px,color:#5B21B6; classDef processNode fill:#DBEAFE,stroke:#2563EB,stroke-width:1px,color:#1E40AF; classDef successNode fill:#D1FAE5,stroke:#059669,stroke-width:2px,color:#065F46; class P1 primaryNode; class B processNode; class S1,S2,S3,S4 primaryNode; class R1,R2,R3 successNode;
Example Topic: commands/group_alpha/devices/reboot
All devices belonging to group_alpha would subscribe to this topic and reboot upon receiving a message.
3. Fan-in Pattern (Many-to-One)
Concept:
Multiple publishers send messages that are all consumed and processed by a single (or a few) subscriber(s).
Use Cases:
- Multiple sensors publishing their readings, with a central application subscribing to aggregate, analyze, or store the data.
- Multiple devices reporting their status to a central monitoring dashboard.
- Collecting votes or responses from many clients.
Implementation:
- Multiple publisher clients (e.g.,
sensor_01
,sensor_02
,sensor_03
) publish messages to topics that can be captured by a common subscription filter.sensor_01
publishes totelemetry/zone_A/sensor_01/temperature
sensor_02
publishes totelemetry/zone_A/sensor_02/temperature
- A central subscriber application subscribes using a wildcard topic filter (e.g.,
telemetry/zone_A/+/temperature
). - The broker delivers messages from all matching publishers to the subscriber. The subscriber can differentiate message sources by inspecting the full topic string (available in the received message event) or by including a device ID in the message payload.
graph TD subgraph Publishers_Side [Multiple Publishers] P1["<b>Sensor 01 (ESP32)</b><br>Publishes to:<br><i>telemetry/zone_A/sensor_01/temperature</i><br>Payload: {\temp\: 22.5}"] P2["<b>Sensor 02 (ESP32)</b><br>Publishes to:<br><i>telemetry/zone_A/sensor_02/temperature</i><br>Payload: {\temp\: 23.1}"] P3["<b>Sensor N (ESP32)</b><br>Publishes to:<br><i>telemetry/zone_A/sensor_N/temperature</i><br>Payload: {\temp\: 21.9}"] end subgraph MQTT_Broker_FanIn [MQTT Broker] B["<b>MQTT Broker</b>"] end subgraph Subscriber_Side [Single Central Subscriber] S1["<b>Central Application / Aggregator</b><br>Subscribed to Wildcard Topic:<br><i>telemetry/zone_A/+/temperature</i>"] end P1 -- Message 1 --> B; P2 -- Message 2 --> B; P3 -- Message N --> B; B -- Delivers Message 1 (from P1) --> S1; B -- Delivers Message 2 (from P2) --> S1; B -- Delivers Message N (from P3) --> S1; S1 -- Processes Data --> R1["Aggregated Data Store / Dashboard<br>- Sensor 01: 22.5 C<br>- Sensor 02: 23.1 C<br>- Sensor N: 21.9 C"]; classDef default fill:#f9f9f9,stroke:#333,stroke-width:1px,font-family:'Open Sans',color:#333; classDef primaryNode fill:#EDE9FE,stroke:#5B21B6,stroke-width:2px,color:#5B21B6; classDef processNode fill:#DBEAFE,stroke:#2563EB,stroke-width:1px,color:#1E40AF; classDef successNode fill:#D1FAE5,stroke:#059669,stroke-width:2px,color:#065F46; class P1,P2,P3 primaryNode; class B processNode; class S1 primaryNode; class R1 successNode;
4. Message Sequencing
Concept:
When a task involves multiple messages that must be processed in a specific order, or when a single large piece of data is split into multiple smaller messages.
Implementation:
- Include sequencing information within the message payload:
- Sequence Number: A simple integer incremented for each message in a sequence.
- Total Message Count: If the total number of messages in a sequence is known beforehand.
- Message ID / Chunk ID: An identifier for the current message/chunk.
- Transaction ID: An ID that groups all messages belonging to the same sequence or transaction.
Sequencing Element (in Payload) | Description | Purpose | Example Value |
---|---|---|---|
Transaction ID / Group ID | A unique identifier that groups all messages belonging to the same overall sequence or logical transaction. | Allows the receiver to associate multiple related messages, even if they arrive interleaved with messages from other transactions. | "txn_log_upload_1678886400" |
Sequence Number / Chunk Number | An integer indicating the order of the current message or chunk within its transaction/group. Typically starts from 0 or 1. | Enables the receiver to reassemble messages in the correct order if they arrive out of sequence. | "seq_num": 3 |
Total Messages / Total Chunks | An integer indicating the total number of messages or chunks expected in the current transaction/group. (Optional, but helpful if known). | Helps the receiver determine when it has received all parts of a sequence. Useful for pre-allocating buffers or detecting completeness. | "total_chunks": 10 |
Message ID / Chunk ID | A unique identifier for the specific message or chunk itself, distinct from the sequence number. (Less common if sequence number and transaction ID are used). | Can be used for acknowledging receipt of individual chunks or for more granular tracking. | "chunk_id": "part_C_of_log_XYZ" |
End-of-Sequence Flag | A boolean flag indicating if the current message is the last one in the sequence. (Alternative or complementary to Total Messages). | Provides an explicit signal that the sequence is complete. | "is_last_chunk": true |
Example Payload for a Chunked Message:
{
"transaction_id": "txn_abc",
"chunk_number": 2,
"total_chunks": 5,
"data_payload": "..." // part 2 of data
}
The receiving application needs to buffer incoming messages and reassemble them in the correct order based on the sequence information before processing the complete data. This adds complexity to the subscriber.
5. Correlation ID (Revisited)
As seen in the Request/Response pattern, a Correlation ID is a unique identifier used to link related messages in an asynchronous workflow.
- Use Cases:
- Matching responses to requests.
- Tracking a piece of data or a command as it flows through multiple processing stages or services connected via MQTT.
- Grouping log messages related to a single operation.
- Generation: Can be a UUID, a sufficiently random string, a timestamp combined with a client ID, or an incrementing number (if scope is limited).
- Propagation: Each component in the workflow that processes or forwards the message should carry the same Correlation ID.
6. Shared Subscriptions (MQTT v5.0 Feature)
Concept (MQTT v5.0):
In standard MQTT (v3.1.1), if multiple clients subscribe to the same topic, each client receives a copy of every message published to that topic. This is ideal for fan-out but not for distributing workload among a group of identical worker applications.
Shared Subscriptions (introduced in MQTT v5.0) allow multiple subscribing clients to share a single subscription to a topic filter. When a message is published to a topic matching this shared subscription, the broker delivers the message to only one of the subscribing clients in the shared group, typically using a round-robin or other load distribution strategy.
Topic Syntax (MQTT v5.0): $share/{ShareName}/{topic_filter}
$share
: Keyword indicating a shared subscription.{ShareName}
: A string that identifies the group of subscribers sharing the subscription. All clients using the sameShareName
for the sametopic_filter
are part of the same shared group.{topic_filter}
: The actual topic filter they are interested in.
Example (MQTT v5.0):
Three worker applications want to process incoming tasks published to tasks/new_jobs/#.
- Worker 1 subscribes to:
$share/job_processors/tasks/new_jobs/#
- Worker 2 subscribes to:
$share/job_processors/tasks/new_jobs/#
- Worker 3 subscribes to:
$share/job_processors/tasks/new_jobs/#
When a message is published to tasks/new_jobs/image_processing
, only one of the three workers will receive it.
graph TD subgraph Publisher_App [Publisher Application] P1["<b>Publisher</b><br>Publishes Message M1 to:<br><i>tasks/new_jobs/image_processing</i>"] end subgraph MQTT_Broker_Shared [MQTT v5.0 Broker] B["<b>MQTT v5.0 Broker</b><br>Understands Shared Subscriptions"] end subgraph Shared_Subscriber_Group [Shared Subscriber Group: \job_processors\] direction LR W1["<b>Worker 1 (Client)</b><br>Subscribed to:<br><i>$share/job_processors/tasks/new_jobs/#</i>"] W2["<b>Worker 2 (Client)</b><br>Subscribed to:<br><i>$share/job_processors/tasks/new_jobs/#</i>"] W3["<b>Worker 3 (Client)</b><br>Subscribed to:<br><i>$share/job_processors/tasks/new_jobs/#</i>"] end P1 -- Message M1 --> B; B -- Broker Distributes M1<br>to ONE client in the group --> W1; W1 -- Processes M1 --> R1["Worker 1 Processes M1"]; %% Illustrate next message going to a different worker P2["<b>Publisher</b><br>Publishes Message M2 to:<br><i>tasks/new_jobs/data_analysis</i>"] -- Message M2 --> B; B -- Broker Distributes M2<br>to ONE client in the group<br>(e.g., Round Robin) --> W2; W2 -- Processes M2 --> R2["Worker 2 Processes M2"]; classDef default fill:#f9f9f9,stroke:#333,stroke-width:1px,font-family:'Open Sans',color:#333; classDef primaryNode fill:#EDE9FE,stroke:#5B21B6,stroke-width:2px,color:#5B21B6; classDef processNode fill:#DBEAFE,stroke:#2563EB,stroke-width:1px,color:#1E40AF; classDef decisionNode fill:#FEF3C7,stroke:#D97706,stroke-width:1px,color:#92400E; classDef successNode fill:#D1FAE5,stroke:#059669,stroke-width:2px,color:#065F46; class P1 primaryNode; class P2 primaryNode; class B processNode; class W1,W2,W3 primaryNode; class R1,R2 successNode;
Benefits:
- Load Balancing: Distributes message processing load across multiple instances of an application.
- High Availability: If one worker instance fails, others in the group can continue processing messages.
ESP-IDF and MQTT v3.1.1 Context:
The esp-mqtt client in ESP-IDF (as of v5.x) primarily supports MQTT v3.1.1. Shared Subscriptions are an MQTT v5.0 feature and are not natively supported by the v3.1.1 protocol itself or the client when connecting as v3.1.1.
- If you connect an ESP32 (using
esp-mqtt
as a v3.1.1 client) to an MQTT v5.0 broker, the ESP32 client cannot use the$share/...
syntax directly to initiate a shared subscription. - Workarounds/Alternatives with v3.1.1 clients:
- Broker-Side Configuration: Some MQTT v5.0 brokers might offer mechanisms to configure shared subscription behavior for v3.1.1 clients through administrative settings or specific non-standard topic structures, but this is broker-dependent.
- Dispatcher Pattern: A dedicated v3.1.1 client (a dispatcher) subscribes to the main topic. This dispatcher then re-distributes messages to a pool of worker clients (which could be other ESP32s or server applications) using individual topics or other means. This adds complexity.
- Topic Partitioning: Manually partition the topic space (e.g.,
tasks/partition1/new_jobs
,tasks/partition2/new_jobs
) and have different worker clients subscribe to different partitions.,
Workaround / Alternative | Description | How it Works (with v3.1.1 Clients) | Complexity / Considerations |
---|---|---|---|
Broker-Side Configuration (Non-Standard) | Some MQTT v5.0 brokers might offer administrative features to treat subscriptions from v3.1.1 clients on certain topics as if they were part of a shared group. | The broker intercepts messages and distributes them to one of the v3.1.1 clients in a configured pool. Client is unaware. | Highly broker-dependent. Not a standard MQTT v3.1.1 feature. Requires specific broker support and configuration. |
Dispatcher Pattern (Application-Level) | A dedicated v3.1.1 client (the “dispatcher”) subscribes to the main topic. This dispatcher then implements logic to re-distribute received messages to a pool of worker clients. | Dispatcher receives all messages. It then forwards each message to one available worker client (e.g., via unique topics like workers/worker_1/task , or using a queue). |
Adds an extra hop and a point of failure (the dispatcher). Requires custom logic for distribution, worker availability tracking, and potentially message queuing. ESP32s could be workers. |
Topic Partitioning (Manual Load Balancing) | The main topic space is manually divided into several sub-topics (partitions). Different worker clients subscribe to different, distinct partitions. | Publisher might need to decide which partition to send a message to, or messages are distributed to partitions round-robin. Each worker gets messages only from its partition.
E.g., Worker A subscribes to tasks/partition_A/# , Worker B to tasks/partition_B/# . |
Requires careful topic design. Load might not be evenly distributed if partitions receive uneven traffic. Less flexible than true shared subscriptions. |
External Queue with MQTT Integration | Messages are published to a topic, then an intermediary service (could be a v5.0 client or a bridge) consumes them and puts them into an external message queue (e.g., RabbitMQ, SQS). Worker clients (any protocol) consume from this queue. | Decouples MQTT from the worker pool. MQTT is used for ingestion. The queue handles load balancing and persistence. | Introduces another system (the message queue) into the architecture. Workers might not use MQTT directly. |
It’s important for ESP32 developers to be aware of Shared Subscriptions if designing systems that will interact with MQTT v5.0 brokers, even if the ESP32 itself is a v3.1.1 client, as it affects how backend subscriber pools might be designed.
7. Error Handling and Status Reporting
In asynchronous systems, clear error reporting is vital.
Strategy | Description | Implementation Notes | Example Use Case / Payload Snippet |
---|---|---|---|
Status in Response (Request/Response) | Include a dedicated status field (e.g., “success”, “error”, “pending”) and optional error details within the response payload of a request/response pattern. | Server includes these fields. Client checks them upon receiving the response. | Payload: {"correlation_id": "c1", "status": "error", "error_code": 4001, "message": "Invalid parameter 'speed'"} |
Dedicated Error Topics | For commands or operations that don’t have a direct synchronous-like response, the processing entity publishes error details to a specific, well-known error topic. | The original requester (if aware) or a monitoring system subscribes to this error topic. Include correlation ID if possible. | Topic: devices/esp32_A/commands/set_config/errors Payload: {"original_corr_id": "req789", "error": "Config update failed: checksum mismatch"} |
Last Will and Testament (LWT) | An MQTT feature where the client pre-configures a message with the broker that is published automatically by the broker if the client disconnects ungracefully. | Client sets Will Flag, Will Topic, Will Message, Will QoS, Will Retain during connection. Often used to report “offline” status. | LWT Topic: devices/esp32_A/status LWT Payload: {"state": "offline", "reason": "lwt"} (often retained) |
Periodic Status Publishing | Devices regularly publish their current operational status, health metrics, or heartbeat to a dedicated status topic. | Can be published as a retained message so new subscribers always get the latest status. Include device ID, timestamp, and relevant metrics. | Topic: devices/esp32_A/heartbeat_status (retained)Payload: {"timestamp": 1678886400, "wifi_rssi": -65, "uptime_sec": 7200, "state": "running"} |
Negative Acknowledgements (NACKs) | If a message is received but cannot be processed correctly, the receiver explicitly publishes a NACK message, possibly to a reply topic or a dedicated NACK topic. | Similar to error in response, but can be used for any subscribed message, not just direct responses. Should include original message identifiers. | Topic: source_app/data_ingest/nack Payload: {"original_msg_id": "m556", "reason": "Payload schema validation failed"} |
Dead Letter Topics / Queues (Broker Feature) | Some advanced brokers can be configured to move messages that cannot be delivered or processed after certain attempts to a “dead letter topic/queue” for later inspection. | This is primarily a broker-side configuration. Useful for catching unhandled messages. | Broker routes undeliverable messages from critical/alerts to critical/alerts/dead_letter . |
- Request/Response: The response message should include a status field (e.g.,
{"status": "success"}
or{"status": "error", "error_message": "Invalid parameters"}
). - Dedicated Error Topics: For commands that don’t have a direct response path, the processing entity could publish error details to a specific error topic that monitoring systems or the original requester (if it knows to listen) can subscribe to.
- Example: Command
devices/esp32_A/set_config
, if error, server publishes todevices/esp32_A/set_config/errors
with payload{"correlation_id": "...", "error_code": 500, "details": "..."}
.
- Example: Command
- Last Will and Testament (LWT): As covered previously, for ungraceful disconnections.
- Status Topics: Devices can periodically publish their operational status to a dedicated status topic (often retained).
Practical Examples
The most common advanced pattern an ESP32 client actively participates in by managing state is Request/Response.
Example: ESP32 Requesting Configuration from a Server
The ESP32 will request its configuration from a server. It will:
- Generate a unique correlation ID.
- Construct a unique response topic.
- Subscribe to the response topic.
- Publish the request (including correlation ID and response topic) to a server’s command topic.
- Wait for a response on its unique response topic, with a timeout.
- Process the response or handle the timeout.
Server-Side (Conceptual – e.g., a Python script using Paho-MQTT):
The server would:
- Subscribe to
server/config_requests/#
. - On receiving a message, parse
correlation_id
andresponse_topic
. - Fetch/generate configuration.
- Publish the configuration to the received
response_topic
with the samecorrelation_id
.
ESP32 Code Snippet (ESP-IDF v5.x):
#include <stdio.h>
#include <string.h>
#include <inttypes.h> // For PRIu32
#include "esp_log.h"
#include "esp_wifi.h"
#include "nvs_flash.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h" // For semaphores
#include "mqtt_client.h"
#include "cJSON.h"
#include "esp_mac.h"
#include "esp_random.h" // For random correlation ID part
static const char *TAG = "REQ_RESP_EXAMPLE";
// MQTT Configuration
#define MQTT_BROKER_URI "mqtt://test.mosquitto.org"
#define DEVICE_ID_PREFIX "esp32_client_"
#define SERVER_REQUEST_TOPIC "server/config_requests/get"
// Buffers and state
char device_id_str[32];
char response_topic_str[128];
char current_correlation_id[20];
char received_config_payload[256]; // Buffer for response
esp_mqtt_client_handle_t client_handle = NULL;
SemaphoreHandle_t response_semaphore = NULL; // To signal response received
bool response_received_flag = false;
// --- Helper: Generate Correlation ID ---
static void generate_correlation_id(char *buffer, size_t len) {
snprintf(buffer, len, "corr_%lx", esp_random());
}
// --- MQTT Event Handler ---
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
esp_mqtt_event_handle_t event = event_data;
client_handle = event->client;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
// Connection established, ready to send requests (triggered elsewhere)
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d, topic: %.*s", event->msg_id, event->topic_len, event->topic);
// If this is our response topic subscription, we can now publish the request
if (strncmp(event->topic, response_topic_str, event->topic_len) == 0) {
ESP_LOGI(TAG, "Successfully subscribed to our dynamic response topic.");
}
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, "DATA=%.*s", event->data_len, event->data);
// Check if this is data on our expected response topic
if (strncmp(event->topic, response_topic_str, event->topic_len) == 0) {
cJSON *root = cJSON_ParseWithLength(event->data, event->data_len);
if (root) {
cJSON *corr_id_item = cJSON_GetObjectItemCaseSensitive(root, "correlation_id");
if (cJSON_IsString(corr_id_item) && (strcmp(corr_id_item->valuestring, current_correlation_id) == 0)) {
ESP_LOGI(TAG, "Correlation ID matches! Processing response.");
strncpy(received_config_payload, event->data, sizeof(received_config_payload) - 1);
received_config_payload[sizeof(received_config_payload) - 1] = '\0';
response_received_flag = true;
if (response_semaphore != NULL) {
xSemaphoreGive(response_semaphore);
}
} else {
ESP_LOGW(TAG, "Correlation ID mismatch or not found in response.");
}
cJSON_Delete(root);
}
}
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
// Handle error, potentially release semaphore if a request was pending
if (response_semaphore != NULL && xSemaphoreTake(response_semaphore, 0) == pdFALSE) { // Check if semaphore is taken
// If a request was pending and we got an error, it might be that the request will not be fulfilled.
// However, this is a generic error, could be unrelated to the pending request.
// More sophisticated error handling might be needed.
}
break;
default:
// ESP_LOGI(TAG, "Other event id:%d", event->id);
break;
}
}
// --- Function to Request Configuration ---
void request_device_configuration(void) {
if (!client_handle) {
ESP_LOGE(TAG, "MQTT client not initialized or not connected.");
return;
}
if (response_semaphore == NULL) {
response_semaphore = xSemaphoreCreateBinary();
}
// Ensure semaphore is taken before making a new request
xSemaphoreTake(response_semaphore, 0);
response_received_flag = false;
memset(received_config_payload, 0, sizeof(received_config_payload));
generate_correlation_id(current_correlation_id, sizeof(current_correlation_id));
snprintf(response_topic_str, sizeof(response_topic_str), "devices/%s/config_response/%s", device_id_str, current_correlation_id);
ESP_LOGI(TAG, "Subscribing to response topic: %s", response_topic_str);
int sub_msg_id = esp_mqtt_client_subscribe(client_handle, response_topic_str, 1);
if (sub_msg_id == -1) {
ESP_LOGE(TAG, "Failed to subscribe to response topic.");
return; // Cannot proceed without subscription
}
ESP_LOGI(TAG, "Subscribe to response topic sent, msg_id=%d. Waiting for SUBACK...", sub_msg_id);
// Ideally, wait for MQTT_EVENT_SUBSCRIBED for this specific topic before publishing.
// For simplicity here, we'll proceed, but in robust code, confirm subscription.
// A short delay or a flag set in MQTT_EVENT_SUBSCRIBED would be better.
vTaskDelay(pdMS_TO_TICKS(500)); // Small delay hoping subscription completes.
cJSON *req_payload_json = cJSON_CreateObject();
if (!req_payload_json) {
ESP_LOGE(TAG, "Failed to create JSON for request.");
return;
}
cJSON_AddStringToObject(req_payload_json, "correlation_id", current_correlation_id);
cJSON_AddStringToObject(req_payload_json, "response_topic", response_topic_str);
// Add any other request parameters if needed
// cJSON_AddStringToObject(req_payload_json, "request_type", "full_config");
char *req_payload_str = cJSON_PrintUnformatted(req_payload_json);
cJSON_Delete(req_payload_json);
if (!req_payload_str) {
ESP_LOGE(TAG, "Failed to render request JSON to string.");
return;
}
ESP_LOGI(TAG, "Publishing config request to %s", SERVER_REQUEST_TOPIC);
ESP_LOGI(TAG, "Request Payload: %s", req_payload_str);
int pub_msg_id = esp_mqtt_client_publish(client_handle, SERVER_REQUEST_TOPIC, req_payload_str, 0, 1, 0);
free(req_payload_str);
if (pub_msg_id == -1) {
ESP_LOGE(TAG, "Failed to publish request.");
// Unsubscribe from response topic as we won't get a response
esp_mqtt_client_unsubscribe(client_handle, response_topic_str);
return;
}
ESP_LOGI(TAG, "Config request published, msg_id=%d. Waiting for response...", pub_msg_id);
// Wait for the response with a timeout
if (xSemaphoreTake(response_semaphore, pdMS_TO_TICKS(10000)) == pdTRUE) { // 10-second timeout
if (response_received_flag) {
ESP_LOGI(TAG, "Configuration Response Received: %s", received_config_payload);
// TODO: Parse received_config_payload and apply configuration
} else {
// This case should ideally not happen if semaphore was given only on flag set.
ESP_LOGW(TAG, "Semaphore taken but no response flag set. Should not happen.");
}
} else {
ESP_LOGE(TAG, "Timeout waiting for configuration response on topic %s (CorrID: %s)", response_topic_str, current_correlation_id);
}
// Cleanup: Unsubscribe from the dynamic response topic
ESP_LOGI(TAG, "Unsubscribing from response topic: %s", response_topic_str);
esp_mqtt_client_unsubscribe(client_handle, response_topic_str);
// Note: Consider if the server might send a late response. How to handle?
// For this example, we unsubscribe. If a late response arrives, it will be ignored.
}
// --- Main Application Start ---
static void mqtt_app_start(void) {
uint8_t mac[6];
esp_read_mac(mac, ESP_MAC_WIFI_STA);
snprintf(device_id_str, sizeof(device_id_str), "%s%02X%02X%02X",
DEVICE_ID_PREFIX, mac[3], mac[4], mac[5]); // Shorter ID for topics
const esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = MQTT_BROKER_URI,
.credentials.client_id = device_id_str,
};
client_handle = esp_mqtt_client_init(&mqtt_cfg); // Assign to global client_handle
esp_mqtt_client_register_event(client_handle, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
esp_mqtt_client_start(client_handle);
// After connection, you would call request_device_configuration()
// For example, in a task or after a delay, or triggered by some event.
// This is a blocking call due to semaphore, so call from a task.
// Example: xTaskCreate(config_request_task, "cfg_req_task", 4096, NULL, 5, NULL);
// where config_request_task calls request_device_configuration() after MQTT connect.
}
// Simplified WiFi Init and app_main
// (Ensure NVS, WiFi init are present as in previous chapters)
// ... (wifi_init_sta() and app_main() similar to Chapter 108) ...
// In app_main, after WiFi and MQTT start, create a task that waits for connection
// and then calls request_device_configuration().
void config_request_task_entry(void *pvParameters) {
// Wait until MQTT is connected. A more robust way is to use an event group or semaphore
// signaled from MQTT_EVENT_CONNECTED.
vTaskDelay(pdMS_TO_TICKS(5000)); // Wait for connection
if (client_handle && esp_mqtt_client_is_connected(client_handle)) { // esp_mqtt_client_is_connected is not a standard API function.
// A flag set in MQTT_EVENT_CONNECTED is better.
// Let's assume a flag `mqtt_connected_flag`
ESP_LOGI(TAG, "MQTT seems connected, attempting to request configuration.");
request_device_configuration();
} else {
ESP_LOGE(TAG, "MQTT not connected, cannot request configuration.");
}
vTaskDelete(NULL);
}
// app_main would look something like:
// void app_main(void) {
// ... (NVS, WiFi init) ...
// mqtt_app_start();
// xTaskCreate(config_request_task_entry, "cfg_req_task", 4096, NULL, 5, NULL);
// }
Key aspects of the ESP32 code:
- Dynamically constructs
response_topic_str
using device ID and correlation ID. - Uses a semaphore (
response_semaphore
) to block the requesting task until a response is received or a timeout occurs. - The
mqtt_event_handler
checks if incoming data is on the expected response topic and if the correlation ID matches. - It’s crucial to unsubscribe from the dynamic response topic after the transaction is complete (response received or timeout) to avoid accumulating unnecessary subscriptions.
- Robustness Note: Waiting for
MQTT_EVENT_SUBSCRIBED
for the specific response topic before publishing the request is more reliable than a fixed delay. This involves matchingevent->topic
inMQTT_EVENT_SUBSCRIBED
withresponse_topic_str
and then signaling the main task to proceed with publishing.
Build Instructions
- Standard ESP-IDF project. Add
cJSON
toREQUIRES
inmain/CMakeLists.txt
. - Configure WiFi and MQTT Broker URI.
idf.py build
Run/Flash/Observe Steps
idf.py -p /dev/ttyUSB0 flash monitor
- Server Side: Have an MQTT client (e.g., Python script with Paho-MQTT, or MQTT Explorer)
- Subscribe to
server/config_requests/get
. - When a message arrives, note the
response_topic
andcorrelation_id
from its payload. - Manually (or programmatically) publish a response JSON to that response_topic, including the same correlation_id. Example response:{“correlation_id”: “corr_XXXX”, “status”: “success”, “data”: {“settingA”: 123, “settingB”: “enabled”}}
- Subscribe to
- ESP32 Logs:
- Observe the ESP32 subscribing to its dynamic response topic.
- Observe it publishing the request.
- When you publish the response from your server-side tool, observe the ESP32 receiving and processing it, or timing out if no response is sent.
Variant Notes
The advanced messaging patterns discussed are generally application-level logic built on top of standard MQTT features. As such, their implementation and behavior are consistent across all ESP32 variants (ESP32, S2, S3, C3, C6, H2) when using ESP-IDF and the esp-mqtt
client.
- Resource Usage: Patterns like Request/Response with dynamic subscriptions and correlation ID management will consume slightly more RAM (for topic strings, state) and CPU (for JSON parsing, string manipulation) than simple publish or subscribe. This is usually well within the capabilities of all ESP32 variants.
- Network Performance: Latency in Request/Response will be affected by network conditions and broker performance, irrespective of the ESP32 variant.
- Shared Subscriptions: As noted, this is an MQTT v5.0 feature. While ESP32s running
esp-mqtt
(v3.1.1 client) cannot initiate shared subscriptions directly, they can interact with systems where backend services use shared subscriptions on an MQTT v5.0 broker. The ESP32 remains a standard v3.1.1 publisher or subscriber in such cases.
The choice of ESP32 variant would primarily influence the complexity of the data being processed or the tasks being requested/responded to, rather than the MQTT patterns themselves.
Common Mistakes & Troubleshooting Tips
Mistake / Issue (Pattern) | Symptom(s) | Troubleshooting / Solution |
---|---|---|
Request/Response: Correlation ID Mismatch or Missing | Client receives responses but cannot match them to original requests, or processes wrong data. Server might not include correlation ID, or client doesn’t check it. | Fix:
1. Client: Always generate a unique correlation ID for each request. Store it with pending request details. 2. Server: Always copy the correlation ID from the request into the corresponding response payload. 3. Client: Upon receiving a response, strictly validate its correlation ID against pending requests before processing. Log correlation IDs on both client and server for easier debugging. |
Request/Response: Not Subscribing to Response Topic Before Request | Client publishes a request but misses the server’s response because it wasn’t subscribed to the dynamic response topic in time. | Fix:
1. Client must send SUBSCRIBE for its unique response topic.
2. Ideally, wait for MQTT_EVENT_SUBSCRIBED (for that specific topic) before sending the PUBLISH request.
3. If immediate publish is done, there’s a race condition; a small delay might work but is not robust. |
Request/Response: Not Handling Timeouts | Client task blocks indefinitely waiting for a response if the server is down, busy, or the response message is lost. | Fix:
1. Implement a timeout mechanism (e.g., using FreeRTOS semaphores with a timeout, or a timer) for each pending request. 2. Define behavior on timeout: retry (with backoff), log error, use a default value, notify user, etc. 3. Clean up any state associated with the timed-out request (e.g., remove from pending list). |
Request/Response: Not Cleaning Up Dynamic Subscriptions | Client subscribes to a new unique response topic for every request (e.g., client/id/response/[correlation_id] ) but never unsubscribes. |
Symptom: Over time, client accumulates many subscriptions, consuming resources on client and broker. Broker might eventually reject new subscriptions.
Fix: After a request is completed (response received or timeout), the client must UNSUBSCRIBE from that specific dynamic response topic. |
Message Sequencing: Handling Out-of-Order or Missing Messages | Receiver gets chunks of a multi-part message out of order, or some chunks are missing (especially with QoS 0), leading to corrupted or incomplete data. | Fix (Receiver):
1. Buffer incoming chunks associated by a transaction/group ID. 2. Use sequence numbers to reorder buffered chunks correctly. 3. If total chunks known, detect missing chunks after a timeout or when an out-of-sequence chunk implies a gap. 4. Implement logic for requesting retransmission of missing chunks or failing the transaction. This adds significant complexity. Consider higher QoS for sequences if possible. |
Fan-in: Differentiating Message Sources | A central subscriber using a wildcard (e.g., sensors/+/data ) receives messages from many publishers but cannot tell which specific publisher sent which message. |
Fix:
1. Topic Structure: The full topic string of the received message (e.g., sensors/device123/data ) is available to the subscriber. Parse the device ID from the topic.
2. Payload Content: Ensure publishers include their unique ID within the message payload itself (e.g., {"device_id": "device123", "value": 42} ). |
Error Handling: Vague or Missing Error Details | A command fails, but the client receives no error information or only a generic “failed” status, making debugging difficult. | Fix:
1. When reporting errors (in response payloads or dedicated error topics), include specific error codes, human-readable messages, and the original correlation ID. 2. Design a clear set of error codes for your application domain. 3. Log detailed errors on the component that failed. |
Exercises
- Implement Request/Response for Time Sync:
- Task: Modify the ESP32 Request/Response example. The ESP32 requests the current UTC time from a “Time Server” (simulated by an MQTT client tool).
- Request Topic:
server/time_service/requests
- Request Payload:
{"correlation_id": "...", "response_topic": "...", "timezone_requested": "UTC"}
- Response Payload:
{"correlation_id": "...", "utc_timestamp": 1678886400, "status": "success"}
- The ESP32 should then log the received timestamp.
- Design: Fan-in Sensor Aggregation:
- Task: You have 5 ESP32s, each measuring temperature in a different room. Design the MQTT topic structure and a conceptual payload for these ESP32s to publish their readings.
- A central application needs to subscribe to all these temperature readings. What topic filter would it use?
- How would the central application identify which room a specific reading came from?
- Design: Multi-Part Firmware Update Notification:
- Task: A server needs to inform an ESP32 about a firmware update that consists of 3 parts (e.g., bootloader, partition table, app). Design the MQTT topic(s) and message payload structure for the server to send these three pieces of information sequentially to a specific ESP32.
- The payload should include sequence numbers, total parts, part type (bootloader, app, etc.), and a data field (e.g., a URL or checksum for that part).
- How would the ESP32 know it has received all parts for a given update transaction?
- Error Reporting for a Command:
- Task: An ESP32 receives a command on
devices/[device_id]/commands/set_motor_speed
with payload{"speed": 5000}
. - If the requested speed is too high (e.g., > 3000), the ESP32 cannot comply.
- Design two different ways the ESP32 could report this error back via MQTT, assuming the original command was part of a request/response pattern (i.e., a
response_topic
andcorrelation_id
were provided in the command request). Describe the topics and payloads for both error reporting methods.
- Task: An ESP32 receives a command on
Summary
- Request/Response Pattern: Simulates synchronous-like communication over MQTT using unique response topics and correlation IDs for matching replies to requests. Requires client-side state management and timeout handling.
- Fan-out Pattern: One publisher to many subscribers; a natural MQTT use case for broadcasting.
- Fan-in Pattern: Many publishers to one (or few) subscribers; used for data aggregation and collection, often with wildcard subscriptions.
- Message Sequencing: Achieved by adding sequence information (numbers, counts, IDs) to message payloads, enabling reassembly of ordered or chunked data.
- Correlation IDs: Crucial for tracking messages across asynchronous steps and in request/response.
- Shared Subscriptions (MQTT v5.0): Allow load balancing of message processing among a group of subscribers. ESP-IDF’s
esp-mqtt
(v3.1.1 client) does not natively use this, but awareness is important for v5.0 broker environments. - Error Handling: Essential in distributed systems; can be part of response payloads or use dedicated error topics.
- These patterns enhance MQTT’s capabilities, allowing for more complex and interactive IoT applications on ESP32 devices.
Further Reading
- “Enterprise Integration Patterns” by Gregor Hohpe and Bobby Woolf: While a general integration book, many patterns (like Request-Reply, Correlation Identifier) are highly relevant.
- HiveMQ Blog Series on MQTT Patterns: HiveMQ often publishes articles on MQTT best practices and design patterns. (Search for “HiveMQ MQTT design patterns”).
- MQTT v5.0 Specification – Shared Subscriptions:
- https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250 (Section 4.8 Shared Subscriptions)
- Articles on “RPC over MQTT” or “Request/Response MQTT”: Many community articles and blog posts discuss various implementations of this common pattern.
