Chapter 226: Stream Buffers and Message Buffers
Chapter Objectives
By the end of this chapter, you will be able to:
- Understand the concept of stream buffers and their role in FreeRTOS.
- Understand the concept of message buffers and how they differ from stream buffers.
- Implement stream buffers to pass a continuous stream of bytes between tasks or from an ISR to a task.
- Implement message buffers to pass variable-sized, discrete messages between tasks.
- Identify the appropriate use cases for each buffer type.
- Troubleshoot common issues related to buffer implementation.
Introduction
In previous chapters, we explored queues as a primary mechanism for inter-task communication. Queues are excellent for passing fixed-size data items. However, many real-world applications require passing streams of data where the size is not known in advance. Consider data arriving from a UART peripheral, a network socket, or a file being read from an SD card. In these scenarios, the data arrives as a flow of bytes, not as neat, pre-sized packets.
To handle these situations efficiently, FreeRTOS provides two powerful mechanisms: stream buffers and message buffers. Stream buffers are designed to handle a continuous stream of bytes, while message buffers extend this capability to handle discrete messages of variable lengths. They are lightweight, RAM-efficient, and optimized for single-reader, single-writer scenarios, making them ideal for driver-level data passing and other high-throughput applications on ESP32.
Theory
What Are Stream Buffers?
A stream buffer is a FreeRTOS communication primitive that behaves like a pipe for bytes. It’s a FIFO (First-In, First-Out) buffer that allows a writer to send a continuous stream of bytes and a reader to retrieve them in the same order.
Think of a stream buffer like a water pipe connecting two points. One task (the writer) pours water (bytes) into one end of the pipe, and another task (the reader) collects it from the other end. The pipe can hold a certain amount of water, and the flow is continuous.

Key Characteristics:
- Byte-Oriented: Data is sent and received as a raw stream of bytes. There is no concept of a “message” or “packet.”
- Single Reader, Single Writer: Stream buffers are highly optimized for use cases where there is only one task or ISR writing to the buffer and only one task reading from it. Using them with multiple readers or writers requires external synchronization (like a mutex), which negates their performance benefits.
- Interrupt Safe: The
...FromISR()
API variants allow data to be sent or received safely from within an Interrupt Service Routine. This is a primary use case, such as an ISR for UART receiving data and passing it to a handler task. - Lockless Implementation: For single-core systems (or when writer and reader run on the same core), their implementation can be lockless, making them very fast. On the dual-core ESP32, they use lightweight locking mechanisms.
What Are Message Buffers?
A message buffer is built on top of a stream buffer. While a stream buffer handles an unstructured stream of bytes, a message buffer handles a stream of discrete messages, each with its own variable size.
To achieve this, when you send a message to a message buffer, FreeRTOS writes the length of the message into the buffer first, followed by the message data itself. The receiving task first reads the length, then waits until it has received that exact number of bytes, ensuring it gets the complete, untruncated message.
Key Characteristics:
- Message-Oriented: Data is sent and received in discrete packets. The reader will receive the entire message sent by the writer or nothing at all. It cannot read a partial message.
- Variable Length: The primary advantage is sending data packets of different sizes without the receiving task needing to know the size beforehand.
- Overhead: Because the size of each message must be stored, a message buffer requires slightly more storage space than a stream buffer for the same amount of data. The size of this overhead is the size of a
size_t
variable (typically 4 bytes on ESP32). - Single Reader, Single Writer: Like stream buffers, they are designed and optimized for single-reader, single-writer scenarios.
Stream Buffers vs. Message Buffers
Feature | Stream Buffer | Message Buffer |
---|---|---|
Data Type | Continuous stream of bytes | Discrete messages of variable size |
Atomic Operations | No, a reader can receive any number of available bytes. | Yes, a reader receives a whole message or nothing. |
Size Information | Not stored. The reader requests a number of bytes. | The size of each message is stored with the message. |
Storage Overhead | None. The buffer stores only the data. | Stores a size_t (4 bytes on ESP32) for each message. |
Primary Use Case | Passing raw byte streams, e.g., UART/SPI data from an ISR. | Passing structured but variable-size data, e.g., network packets, log messages. |
Send Function | xStreamBufferSend() |
xMessageBufferSend() |
Receive Function | xStreamBufferReceive() |
xMessageBufferReceive() |
Key Feature | Unblocks reader based on a “trigger level”. | Preserves message boundaries. |
Tip: Choose a stream buffer when you are working with raw, unstructured serial data. Choose a message buffer when you need to preserve the boundaries between variable-length data packets.
Practical Examples
Example 1: Stream Buffer for Simulated UART Data
In this example, we will create a system with two tasks. The producer_task
will simulate receiving data (like from a UART peripheral) and write it to a stream buffer. The consumer_task
will read this data from the buffer and print it to the console.
%%{init: {'theme': 'base', 'themeVariables': { 'fontFamily': 'Open Sans'}}}%% flowchart TD subgraph Producer Task direction TB P_START(<b>Start Producer</b>) --> P_LOOP_START(Loop Start) P_LOOP_START --> P_SEND_DATA["Send part of data <br> xStreamBufferSend()"] P_SEND_DATA --> P_CHECK_SENT{All bytes sent?} P_CHECK_SENT -- Yes --> P_DELAY["vTaskDelay(500ms)"] P_CHECK_SENT -- No --> P_LOG_ERROR["/Log Buffer Full/"] --> P_DELAY P_DELAY --> P_LOOP_END{More parts to send?} P_LOOP_END -- Yes --> P_LOOP_START P_LOOP_END -- No --> P_END(<b>End Producer</b>) end subgraph Consumer Task direction TB C_START(<b>Start Consumer</b>) --> C_WAIT["Wait for data <br> xStreamBufferReceive()"] C_WAIT --> C_CHECK_TRIGGER{"Trigger Level Met?<br>(100 bytes available)"} C_CHECK_TRIGGER -- No --> C_WAIT C_CHECK_TRIGGER -- Yes --> C_READ[Read available bytes <br> from buffer] C_READ --> C_PROCESS[Process/Print Data] C_PROCESS --> C_WAIT end subgraph Stream Buffer State direction TB SB_EMPTY([Buffer: 0 bytes]) --> SB_FILLING([Buffer: 28 bytes]) SB_FILLING --> SB_FILLING2([Buffer: 56 bytes]) SB_FILLING2 --> SB_TRIGGER_MET([Buffer: 151 bytes]) SB_TRIGGER_MET --> SB_READING([Buffer: 51 bytes]) end %% Styling classDef start-end fill:#EDE9FE,stroke:#5B21B6,stroke-width:2px,color:#5B21B6; classDef process fill:#DBEAFE,stroke:#2563EB,stroke-width:1px,color:#1E40AF; classDef decision fill:#FEF3C7,stroke:#D97706,stroke-width:1px,color:#92400E; classDef check fill:#FEE2E2,stroke:#DC2626,stroke-width:1px,color:#991B1B; classDef success fill:#D1FAE5,stroke:#059669,stroke-width:2px,color:#065F46; class L1,P_START,P_END,C_START start-end; class L2,P_CHECK_SENT,P_LOOP_END,C_CHECK_TRIGGER decision; class L3,P_LOOP_START,P_SEND_DATA,P_DELAY,C_WAIT,C_READ,C_PROCESS,SB_EMPTY,SB_FILLING,SB_FILLING2,SB_TRIGGER_MET,SB_READING process; class L4,P_LOG_ERROR check; %% Connections P_SEND_DATA -- "sends 28 bytes" --> SB_FILLING P_SEND_DATA -- "sends another 28 bytes" --> SB_FILLING2 P_SEND_DATA -- "sends 95 bytes" --> SB_TRIGGER_MET SB_TRIGGER_MET -- "unblocks Consumer" --> C_CHECK_TRIGGER C_READ -- "reads 100 bytes" --> SB_READING
Code
#include <stdio.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/stream_buffer.h"
#include "esp_log.h"
static const char *TAG = "STREAM_BUFFER_EXAMPLE";
// Define buffer size and trigger level
#define STREAM_BUFFER_SIZE_BYTES 1024
#define TRIGGER_LEVEL_BYTES 100 // The consumer task will unblock when 100 bytes are available
// Handle for the stream buffer
static StreamBufferHandle_t xStreamBuffer;
/**
* @brief Producer Task: Simulates receiving data and sending it to the stream buffer.
* * In a real application, this task might be replaced by an ISR that receives
* data from a peripheral like UART, SPI, or I2C.
*/
static void producer_task(void *pvParameters)
{
ESP_LOGI(TAG, "Producer task started");
const char *test_data[] = {
"This is the first sentence. ",
"Here comes the second part. ",
"And this is a much, much, much, much, much, much, much, much, much, much longer third part to ensure we cross the trigger level. ",
"Finally, the end."
};
int part_count = sizeof(test_data) / sizeof(test_data[0]);
for (int i = 0; i < 2; i++) // Run the simulation twice
{
ESP_LOGI(TAG, "Producer: Starting data transmission cycle %d", i + 1);
for (int j = 0; j < part_count; j++)
{
size_t data_len = strlen(test_data[j]);
ESP_LOGI(TAG, "Producer: Sending %d bytes: '%s'", data_len, test_data[j]);
// Send data to the stream buffer
size_t bytes_sent = xStreamBufferSend(xStreamBuffer,
(void *) test_data[j],
data_len,
pdMS_TO_TICKS(100)); // 100ms timeout
if (bytes_sent != data_len) {
ESP_LOGE(TAG, "Producer: Failed to send all data. Buffer might be full.");
}
vTaskDelay(pdMS_TO_TICKS(500)); // Wait before sending next part
}
vTaskDelay(pdMS_TO_TICKS(2000)); // Wait before next cycle
}
ESP_LOGI(TAG, "Producer task finished and will be deleted.");
vTaskDelete(NULL);
}
/**
* @brief Consumer Task: Waits for data in the stream buffer and processes it.
*
* This task waits until the trigger level is reached before it starts reading.
*/
static void consumer_task(void *pvParameters)
{
ESP_LOGI(TAG, "Consumer task started");
uint8_t rx_buffer[256]; // Buffer to hold received data
while (1)
{
// Zero out the buffer to avoid printing old data
memset(rx_buffer, 0, sizeof(rx_buffer));
// Wait to receive data from the stream buffer.
// The task will block until the trigger level (100 bytes) is met.
// After being unblocked, it will continue to read data as it arrives.
size_t bytes_received = xStreamBufferReceive(xStreamBuffer,
(void *) rx_buffer,
sizeof(rx_buffer) - 1, // Leave space for null terminator
portMAX_DELAY); // Wait indefinitely
if (bytes_received > 0)
{
// Data successfully received
ESP_LOGI(TAG, "Consumer: Received %d bytes: '%s'", bytes_received, (char*)rx_buffer);
}
else
{
// This part should not be reached if portMAX_DELAY is used,
// but is good practice for handling timeouts.
ESP_LOGW(TAG, "Consumer: No data received.");
}
}
}
void app_main(void)
{
ESP_LOGI(TAG, "Creating stream buffer...");
// Create the stream buffer.
xStreamBuffer = xStreamBufferCreate(STREAM_BUFFER_SIZE_BYTES, TRIGGER_LEVEL_BYTES);
if (xStreamBuffer == NULL) {
ESP_LOGE(TAG, "Failed to create stream buffer.");
return;
}
ESP_LOGI(TAG, "Creating producer and consumer tasks...");
xTaskCreate(producer_task, "producer_task", 4096, NULL, 5, NULL);
xTaskCreate(consumer_task, "consumer_task", 4096, NULL, 5, NULL);
}
Build and Run Steps
- Open VS Code with the ESP-IDF extension installed.
- Create a new project using the “ESP-IDF: New Project” command. Choose a template like
sample_project
. - Copy the code above and replace the contents of
main/main.c
. - Connect your ESP32 board.
- Use the “ESP-IDF: Build, Flash, Monitor” command (or the combined icon in the status bar).
- Observe the Output: You will see the producer sending data parts. The consumer will remain blocked until over 100 bytes have been sent. Once the trigger level is reached, the consumer will unblock and start printing the received data.
Example 2: Message Buffer for Variable-Length “Packets”
This example demonstrates a message buffer. The producer sends messages of varying lengths, and the consumer reads them one complete message at a time.
%%{init: {'theme': 'base', 'themeVariables': { 'fontFamily': 'Open Sans'}}}%% flowchart TD subgraph Producer Task P_START(<b>Start Producer</b>) --> P_LOOP_START(For each message...) P_LOOP_START --> P_SEND["Send message with length<br><b>xMessageBufferSend()</b>"] P_SEND --> P_CHECK{Buffer Full?} P_CHECK -- No --> P_DELAY["vTaskDelay(1000ms)"] P_CHECK -- Yes --> P_LOG_ERROR[/Log Error & Wait/] --> P_DELAY P_DELAY --> P_LOOP_END{More messages?} P_LOOP_END -- Yes --> P_LOOP_START P_LOOP_END -- No --> P_END(<b>End Producer</b>) end subgraph Consumer Task C_START(<b>Start Consumer</b>) --> C_WAIT["Wait for a complete message<br><b>xMessageBufferReceive()</b>"] C_WAIT --> C_CHECK{Message Received?} C_CHECK -- No --> C_WAIT C_CHECK -- Yes --> C_READ["Read entire message <br> (Size + Data)"] C_READ --> C_PROCESS[Process/Print Message] C_PROCESS --> C_DONE_CHECK{"Message is Done?"} C_DONE_CHECK -- No --> C_WAIT C_DONE_CHECK -- Yes --> C_END(<b>End Consumer</b>) end subgraph Message Buffer Interaction direction LR P_SEND -- "Message 1 (13 bytes)" --> BUFFER(["<b>Message Buffer</b><br>[Len:13 | 'Short message']"]) BUFFER -- "Consumer reads 1" --> C_READ P_SEND -- "Message 2 (32 bytes)" --> BUFFER2(["<b>Message Buffer</b><br>[Len:32 | 'This is a medium...']"]) BUFFER2 -- "Consumer reads 2" --> C_READ end %% Styling classDef start-end fill:#EDE9FE,stroke:#5B21B6,stroke-width:2px,color:#5B21B6; classDef process fill:#DBEAFE,stroke:#2563EB,stroke-width:1px,color:#1E40AF; classDef decision fill:#FEF3C7,stroke:#D97706,stroke-width:1px,color:#92400E; classDef check fill:#FEE2E2,stroke:#DC2626,stroke-width:1px,color:#991B1B; classDef success fill:#D1FAE5,stroke:#059669,stroke-width:2px,color:#065F46; class P_START,P_END,C_START,C_END start-end; class P_LOOP_START,P_SEND,P_DELAY,C_WAIT,C_READ,C_PROCESS,BUFFER,BUFFER2 process; class P_CHECK,P_LOOP_END,C_CHECK,C_DONE_CHECK decision; class P_LOG_ERROR check;
Code
#include <stdio.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/message_buffer.h"
#include "esp_log.h"
static const char *TAG = "MESSAGE_BUFFER_EXAMPLE";
// Define buffer size. The actual usable space will be slightly less due to overhead.
// For message buffers, a trigger level is not used.
#define MESSAGE_BUFFER_SIZE_BYTES 1024
// Handle for the message buffer
static MessageBufferHandle_t xMessageBuffer;
/**
* @brief Producer Task: Creates and sends variable-sized messages.
*/
static void producer_task(void *pvParameters)
{
ESP_LOGI(TAG, "Producer task started");
// A few messages of different lengths
const char *messages[] = {
"Short message",
"This is a medium-length message.",
"This one is considerably longer to show how variable sizes are handled with ease.",
"Done"
};
int num_messages = sizeof(messages) / sizeof(messages[0]);
for (int i = 0; i < num_messages; i++)
{
size_t msg_len = strlen(messages[i]);
ESP_LOGI(TAG, "Producer: Sending message of %d bytes: '%s'", msg_len, messages[i]);
// Send a message to the message buffer.
// FreeRTOS will prepend the message length automatically.
size_t bytes_sent = xMessageBufferSend(xMessageBuffer,
(void *) messages[i],
msg_len,
pdMS_TO_TICKS(100)); // 100ms timeout
if (bytes_sent != msg_len) {
ESP_LOGE(TAG, "Producer: Failed to send complete message. Buffer full?");
}
vTaskDelay(pdMS_TO_TICKS(1000)); // Wait a second before sending the next message
}
ESP_LOGI(TAG, "Producer task finished and will be deleted.");
vTaskDelete(NULL);
}
/**
* @brief Consumer Task: Receives and processes complete messages.
*/
static void consumer_task(void *pvParameters)
{
ESP_LOGI(TAG, "Consumer task started");
char rx_buffer[256]; // A buffer large enough for the biggest expected message
while (1)
{
memset(rx_buffer, 0, sizeof(rx_buffer));
// Receive one complete message from the buffer.
// The function returns the actual size of the received message.
size_t bytes_received = xMessageBufferReceive(xMessageBuffer,
(void *) rx_buffer,
sizeof(rx_buffer),
portMAX_DELAY); // Wait indefinitely
if (bytes_received > 0)
{
ESP_LOGI(TAG, "Consumer: Received a complete message of %d bytes: '%s'", bytes_received, rx_buffer);
// Check if this is the last message
if (strcmp(rx_buffer, "Done") == 0) {
ESP_LOGI(TAG, "Consumer: 'Done' message received. Exiting.");
break; // Exit the loop
}
}
}
ESP_LOGI(TAG, "Consumer task finished and will be deleted.");
vTaskDelete(NULL);
}
void app_main(void)
{
ESP_LOGI(TAG, "Creating message buffer...");
// Create the message buffer.
xMessageBuffer = xMessageBufferCreate(MESSAGE_BUFFER_SIZE_BYTES);
if (xMessageBuffer == NULL) {
ESP_LOGE(TAG, "Failed to create message buffer.");
return;
}
ESP_LOGI(TAG, "Creating producer and consumer tasks...");
xTaskCreate(producer_task, "producer_task", 4096, NULL, 5, NULL);
xTaskCreate(consumer_task, "consumer_task", 4096, NULL, 5, NULL);
}
Build and Run Steps
- Follow the same project setup steps as in the previous example.
- Replace the code in
main/main.c
with the message buffer example. - Build, flash, and monitor the device.
- Observe the Output: You will see the producer sending messages of different lengths. The consumer receives each one as a distinct, complete package, printing its content and exact size.
Variant Notes
Stream buffers and message buffers are core components of the FreeRTOS kernel. As such, their functionality is identical across all ESP32 family variants, including ESP32, ESP32-S2, ESP32-S3, ESP32-C3, ESP32-C6, and ESP32-H2.
The underlying implementation is hardware-agnostic. The primary consideration that changes between variants is the total amount of available RAM. On memory-constrained variants like the ESP32-C3, you will need to be more judicious about the size of the buffers you create (STREAM_BUFFER_SIZE_BYTES
or MESSAGE_BUFFER_SIZE_BYTES
). On variants with ample RAM like the ESP32-S3, you have much more flexibility. The logic and API calls remain exactly the same.
Common Mistakes & Troubleshooting Tips
Mistake / Issue | Symptom(s) | Troubleshooting / Solution |
---|---|---|
Buffer Full / Send Timeout | x...Send() returns 0 or fewer bytes than attempted. Producer task logs “Failed to send”. |
1. Consumer too slow: Ensure the consumer task can process data faster than the producer sends it on average. 2. Increase Buffer Size: If data arrives in large bursts, increase the buffer size in x...Create() .3. Check Timeout: A short timeout might not be enough. Consider a longer one or portMAX_DELAY .
|
Using Wrong API Family | Data seems corrupted, has strange leading bytes, or crashes. E.g., using xStreamBufferReceive() on a message buffer. |
Always match the functions to the buffer type: – Stream Buffer: Use xStreamBuffer...() functions.– Message Buffer: Use xMessageBuffer...() functions.
|
Forgetting Message Buffer Overhead | Buffer seems to hold less data than allocated. xMessageBufferSend() fails unexpectedly. |
Account for overhead in size calculation. Each message adds sizeof(size_t) (4 bytes) for its length.Required size = (Sum of all message data sizes) + (Number of messages * 4). |
Receiver Buffer Too Small (Message Buffers) | xMessageBufferReceive() returns 0, even though data is in the buffer. The consumer task is permanently blocked. |
The buffer passed to xMessageBufferReceive() must be large enough for the biggest possible message.Increase the size of your rx_buffer array.
|
Multi-Writer/Reader Without Protection | Race conditions, data corruption, overwritten data, unpredictable behavior, crashes. |
These buffers are for single-reader/single-writer use. – For many-to-one or many-to-many, use a FreeRTOS Queue. – If you must use a buffer, protect all send and receive calls with a xSemaphoreTake() and xSemaphoreGive() .
|
Exercises
- Real UART to Stream Buffer: Modify the first example. Instead of a
producer_task
that simulates data, initialize the UART peripheral. Create a UART event task (or use an ISR if you feel advanced) that reads incoming data from the UART RX pin and sends it directly to the stream buffer. Theconsumer_task
will then print data typed into a serial terminal. - Protected Message Buffer: Create a system with one
producer_task
and two identicalconsumer_task
s that all share a single message buffer. Because message buffers are not safe for multiple readers, implement a mutex. Before a consumer task attempts to read from the message buffer, it must take the mutex. After it finishes reading, it must give the mutex back. Observe how the two consumer tasks take turns processing messages. - Centralized Logging System: Design a logging system using a message buffer. Create 3 different tasks, each performing a different “job” (e.g., “WiFi Task”, “Sensor Task”, “UI Task”). When an event occurs in any of these tasks, they should format a string (e.g.,
"Sensor Task: Temperature is 25.3 C"
) and send it to a centrallogging_task
via a message buffer. Thelogging_task
‘s only job is to wait for messages and print them to the console, prefixed with a timestamp. This demonstrates a practical, many-to-one use case (which still requires a mutex to protect the multiple writers).
Summary
- Stream Buffers are ideal for transferring a continuous, unstructured stream of bytes, especially from an ISR to a task. They are FIFO byte pipes.
- Message Buffers are built on stream buffers and are used for transferring variable-length but discrete messages. They preserve message boundaries.
- Both are optimized for single-reader, single-writer scenarios, which makes them very fast and efficient.
- Stream buffers use a trigger level to unblock a waiting reader, while message buffers do not.
- Message buffers have a small storage overhead (
sizeof(size_t)
) for each message to store its length. - The choice between them depends on the data: use stream buffers for raw streams (e.g., serial data) and message buffers for packetized data (e.g., network packets, log entries).
- Functionality is identical across all ESP32 variants, with available RAM being the only practical difference.
Further Reading
- FreeRTOS Official Documentation on Stream Buffers: https://www.freertos.org/stream-buffers.html
- FreeRTOS Official Documentation on Message Buffers: https://www.freertos.org/message-buffers.html
- ESP-IDF Programming Guide – RTOS: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/system/freertos.html