CODE¶
Warning
The following code should be based on the code in the release code, which may have been updated.
arch_producer_consumer.h¶
/**
* @file arch_producer_consumer.h
* @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
* @brief Architecture 1: Producer-Consumer Queue Implementation
* @version 1.0
* @date 2025-01-17
* @copyright Copyright (c) 2025
*
* @details
* This architecture uses a producer-consumer pattern with circular buffer:
* - Producer task (high priority): Reads sensor data and writes to circular buffer
* - Consumer task (medium priority): Reads from circular buffer and processes data
* - Circular buffer with mutex provides synchronization
*
* Advantages:
* - Simple implementation
* - Decouples acquisition and processing
* - Suitable for medium frequency (0.1-1000 Hz, recommended: 100 Hz - 1 kHz)
*/
#pragma once
#include "arch_common.h" // Includes rt_process_config_t
#include "node_acc_adxl355.h"
#include "esp_err.h"
#ifdef __cplusplus
extern "C"
{
#endif
/* ============================================================================
* CONFIGURATION
* ============================================================================ */
/**
* @brief Default queue size for producer-consumer architecture
* @note Uses TINY_MEASUREMENT_RT_PC_QUEUE_SIZE_DEFAULT from tiny_measurement_config.h
*/
#ifndef RT_PC_QUEUE_SIZE_DEFAULT
#define RT_PC_QUEUE_SIZE_DEFAULT TINY_MEASUREMENT_RT_PC_QUEUE_SIZE_DEFAULT
#endif
/**
* @brief Producer task priority (high priority for timely acquisition)
* @note Uses TINY_MEASUREMENT_RT_PC_PRODUCER_PRIORITY from tiny_measurement_config.h
*/
#ifndef RT_PC_PRODUCER_PRIORITY
#define RT_PC_PRODUCER_PRIORITY TINY_MEASUREMENT_RT_PC_PRODUCER_PRIORITY
#endif
/**
* @brief Consumer task priority (high priority for processing, close to producer)
* @note Uses TINY_MEASUREMENT_RT_PC_CONSUMER_PRIORITY from tiny_measurement_config.h
*/
#ifndef RT_PC_CONSUMER_PRIORITY
#define RT_PC_CONSUMER_PRIORITY TINY_MEASUREMENT_RT_PC_CONSUMER_PRIORITY
#endif
/**
* @brief Producer task stack size
* @note Uses TINY_MEASUREMENT_RT_PC_PRODUCER_STACK_SIZE from tiny_measurement_config.h
*/
#ifndef RT_PC_PRODUCER_STACK_SIZE
#define RT_PC_PRODUCER_STACK_SIZE TINY_MEASUREMENT_RT_PC_PRODUCER_STACK_SIZE
#endif
/**
* @brief Consumer task stack size
* @note Uses TINY_MEASUREMENT_RT_PC_CONSUMER_STACK_SIZE from tiny_measurement_config.h
*/
#ifndef RT_PC_CONSUMER_STACK_SIZE
#define RT_PC_CONSUMER_STACK_SIZE TINY_MEASUREMENT_RT_PC_CONSUMER_STACK_SIZE
#endif
/* ============================================================================
* FUNCTION DECLARATIONS
* ============================================================================ */
/**
* @brief Initialize producer-consumer architecture
* @param config Configuration structure
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_init(const rt_process_config_t *config);
/**
* @brief Set sensor handle for producer-consumer architecture
* @param handle ADXL355 sensor handle
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_set_sensor_handle(adxl355_handle_t *handle);
/**
* @brief Start producer-consumer architecture
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_start(void);
/**
* @brief Stop producer-consumer architecture
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_stop(void);
/**
* @brief Deinitialize producer-consumer architecture
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_deinit(void);
/**
* @brief Get status of producer-consumer architecture
* @param status Output parameter for status
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_get_status(rt_process_status_t *status);
/**
* @brief Get statistics of producer-consumer architecture
* @param stats Output parameter for statistics
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_get_stats(rt_process_stats_t *stats);
/**
* @brief Set sampling frequency for producer-consumer architecture
* @param frequency_hz New sampling frequency in Hz
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_set_frequency(float frequency_hz);
/**
* @brief Check if producer-consumer architecture is running
* @param is_running Output parameter: true if running
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_is_running(bool *is_running);
/**
* @brief Enable or disable acceleration detection with LCD feedback
* @param enable true to enable detection, false to disable
* @return ESP_OK on success, error code on failure
* @note When enabled, LCD will show RED when conditions are met, WHITE otherwise
* @note Conditions: |x| > 0.5g OR |y| > 0.5g OR |z| < 0.5g
*/
esp_err_t arch_pc_set_accel_detection(bool enable);
#ifdef __cplusplus
}
#endif
arch_producer_consumer.c¶
/**
* @file arch_producer_consumer.c
* @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
* @brief Architecture 1: Producer-Consumer Queue Implementation
* @version 1.0
* @date 2025-01-17
* @copyright Copyright (c) 2025
*
*/
#include "arch_producer_consumer.h"
#include "node_mqtt.h" // Provides s_mqtt_client, s_is_mqtt_connected, MQTT_PUBLISH_TOPIC
#include "node_lcd.h" // LCD control for acceleration detection
#include "esp_log.h"
#include "esp_timer.h"
#include "esp_heap_caps.h" // For PSRAM allocation
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "freertos/semphr.h"
#include <string.h>
#include <stdio.h>
#include <math.h>
/* ============================================================================
* PRIVATE DEFINITIONS
* ============================================================================ */
static const char *TAG = "RTArch_PC";
/* ============================================================================
* PRIVATE VARIABLES
* ============================================================================ */
static rt_process_config_t s_config = {
.sampling_frequency_hz = 100.0f,
.queue_size = RT_PC_QUEUE_SIZE_DEFAULT,
.enable_mqtt = false,
.enable_serial = true,
.enable_accel_detection = false,
.mqtt_topic = NULL
};
// Circular buffer structure
typedef struct {
rt_process_sample_t *buffer; // Data buffer (PSRAM)
uint32_t buffer_size; // Buffer size (number of samples)
volatile uint32_t write_ptr; // Write pointer (producer) - 写头
volatile uint32_t read_ptr; // Read pointer (consumer) - 读头
SemaphoreHandle_t mutex; // Mutex for synchronization
uint32_t overwrite_count; // Count of overwrites (monitoring)
uint32_t total_writes; // Total number of writes
uint32_t total_reads; // Total number of reads
} rt_circular_buffer_t;
static rt_circular_buffer_t s_circular_buffer = {0};
static TaskHandle_t s_producer_task_handle = NULL;
static TaskHandle_t s_consumer_task_handle = NULL;
static esp_timer_handle_t s_sampling_timer = NULL;
static bool s_is_running = false;
static bool s_is_initialized = false;
static adxl355_handle_t *s_adxl355_handle = NULL;
// Statistics
static rt_process_stats_t s_stats = {0};
// Acceleration detection state
static bool s_enable_accel_detection = false; // Enable acceleration detection and LCD control
static uint16_t s_current_lcd_color = 0xFFFF; // Current LCD color (WHITE)
static uint64_t s_last_lcd_update_us = 0;
static uint64_t s_color_hold_until_us = 0; // Time until which color should be held (for persistence)
static bool s_color_hold_active = false; // Whether color hold is currently active
static bool s_lcd_update_in_progress = false; // Flag to prevent concurrent LCD updates
// LCD color hold duration: 0.3 seconds
#define LCD_COLOR_HOLD_DURATION_US 300000U // 0.3 seconds (300ms) in microseconds
// LCD update minimum interval to avoid SPI bus conflict
#define LCD_UPDATE_MIN_INTERVAL_US 100000U // Minimum 100ms between LCD updates
// Detection thresholds
#define ACCEL_THRESHOLD_X 0.5f // Condition 1: |x| > 0.5g
#define ACCEL_THRESHOLD_Y 0.5f // Condition 2: |y| > 0.5g
#define ACCEL_THRESHOLD_Z 0.5f // Condition 3: |z| < 0.5g
// Circular buffer configuration
#define CIRCULAR_BUFFER_SIZE 512 // Buffer size: 512 samples
#define CONSUMER_PROCESS_INTERVAL_MS 50 // Consumer processes buffer every 50ms
#define CONSUMER_PROCESS_SAMPLE_COUNT 10 // Process last 10 samples for real-time response
/* ============================================================================
* PRIVATE FUNCTION DECLARATIONS
* ============================================================================ */
static void sampling_timer_callback(void *arg);
static void producer_task(void *pvParameters);
static void consumer_task(void *pvParameters);
static void update_stats_acquisition(uint64_t time_us);
static void update_stats_processing(uint64_t time_us);
/* ============================================================================
* PRIVATE FUNCTION IMPLEMENTATIONS
* ============================================================================ */
/**
* @brief Timer callback for periodic sampling (triggers producer task)
*/
static void sampling_timer_callback(void *arg)
{
if (!s_is_running || s_adxl355_handle == NULL)
{
return;
}
// Notify producer task to read sensor
if (s_producer_task_handle != NULL)
{
xTaskNotify(s_producer_task_handle, 1, eSetBits);
}
}
/**
* @brief Producer task: Reads sensor data and enqueues
*/
static void producer_task(void *pvParameters)
{
rt_process_sample_t sample;
uint32_t notification_value;
int64_t start_time;
ESP_LOGI(TAG, "Producer task started");
while (s_is_running)
{
// Wait for timer notification
if (xTaskNotifyWait(0, ULONG_MAX, ¬ification_value, portMAX_DELAY) == pdTRUE)
{
if (!s_is_running || s_adxl355_handle == NULL)
{
break;
}
start_time = esp_timer_get_time();
// Read sensor data
adxl355_accelerations_t accel;
float temperature;
esp_err_t accel_ret = adxl355_read_accelerations(s_adxl355_handle, &accel);
esp_err_t temp_ret = adxl355_read_temperature(s_adxl355_handle, &temperature);
if (accel_ret == ESP_OK && temp_ret == ESP_OK)
{
// Prepare sample
sample.x = accel.x;
sample.y = accel.y;
sample.z = accel.z;
sample.temp = temperature;
sample.timestamp_us = esp_timer_get_time();
// Update acquisition statistics
update_stats_acquisition(esp_timer_get_time() - start_time);
// Write to circular buffer (FIFO overwrite)
if (s_circular_buffer.buffer != NULL && s_circular_buffer.mutex != NULL)
{
// Take mutex to protect buffer access
if (xSemaphoreTake(s_circular_buffer.mutex, pdMS_TO_TICKS(10)) == pdTRUE)
{
// Write sample to current write position
uint32_t write_idx = s_circular_buffer.write_ptr;
s_circular_buffer.buffer[write_idx] = sample;
// Update write pointer (circular)
uint32_t next_write_ptr = (write_idx + 1) % s_circular_buffer.buffer_size;
// Check if write pointer will catch up with read pointer (buffer full)
if (next_write_ptr == s_circular_buffer.read_ptr && s_circular_buffer.total_writes >= s_circular_buffer.buffer_size)
{
// Buffer full: advance read pointer (FIFO overwrite)
s_circular_buffer.read_ptr = (s_circular_buffer.read_ptr + 1) % s_circular_buffer.buffer_size;
s_circular_buffer.overwrite_count++;
}
s_circular_buffer.write_ptr = next_write_ptr;
s_circular_buffer.total_writes++;
xSemaphoreGive(s_circular_buffer.mutex);
s_stats.total_samples++;
}
else
{
// Mutex timeout - drop sample
s_stats.dropped_samples++;
ESP_LOGW(TAG, "Mutex timeout, sample dropped");
}
}
else
{
s_stats.dropped_samples++;
ESP_LOGW(TAG, "Circular buffer not initialized, sample dropped");
}
}
else
{
ESP_LOGE(TAG, "Failed to read sensor: accel=%s, temp=%s",
esp_err_to_name(accel_ret), esp_err_to_name(temp_ret));
}
}
}
ESP_LOGI(TAG, "Producer task stopped");
vTaskDelete(NULL);
}
/**
* @brief Consumer task: Periodically extracts entire buffer snapshot for analysis
*/
static void consumer_task(void *pvParameters)
{
// Local buffer to store last N samples (fast access)
rt_process_sample_t recent_samples[CONSUMER_PROCESS_SAMPLE_COUNT];
ESP_LOGI(TAG, "Consumer task started (circular buffer mode, processing last %d samples)",
CONSUMER_PROCESS_SAMPLE_COUNT);
while (s_is_running)
{
// Wait for processing interval
vTaskDelay(pdMS_TO_TICKS(CONSUMER_PROCESS_INTERVAL_MS));
if (!s_is_running)
{
break;
}
// Extract entire buffer snapshot
if (s_circular_buffer.buffer != NULL && s_circular_buffer.mutex != NULL)
{
int64_t process_start_time = esp_timer_get_time();
// Take mutex to protect buffer access
if (xSemaphoreTake(s_circular_buffer.mutex, pdMS_TO_TICKS(50)) == pdTRUE)
{
// Standard circular buffer read: read from read_ptr
uint32_t write_ptr = s_circular_buffer.write_ptr;
uint32_t read_ptr = s_circular_buffer.read_ptr;
uint32_t overwrite_count = s_circular_buffer.overwrite_count;
// Calculate available samples (distance between read_ptr and write_ptr)
uint32_t available_samples;
if (write_ptr >= read_ptr)
{
available_samples = write_ptr - read_ptr; // Normal case
}
else
{
available_samples = (s_circular_buffer.buffer_size - read_ptr) + write_ptr; // Wrapped around
}
// Read samples from read pointer (up to CONSUMER_PROCESS_SAMPLE_COUNT)
uint32_t samples_to_read = (available_samples > CONSUMER_PROCESS_SAMPLE_COUNT) ?
CONSUMER_PROCESS_SAMPLE_COUNT : available_samples;
if (samples_to_read > 0)
{
// Copy samples starting from read pointer
for (uint32_t i = 0; i < samples_to_read; i++)
{
uint32_t src_idx = (read_ptr + i) % s_circular_buffer.buffer_size;
recent_samples[i] = s_circular_buffer.buffer[src_idx];
}
// Update read pointer (advance read head)
s_circular_buffer.read_ptr = (read_ptr + samples_to_read) % s_circular_buffer.buffer_size;
s_circular_buffer.total_reads += samples_to_read;
// Fill remaining slots with zeros if not enough samples
for (uint32_t i = samples_to_read; i < CONSUMER_PROCESS_SAMPLE_COUNT; i++)
{
recent_samples[i].x = 0.0f;
recent_samples[i].y = 0.0f;
recent_samples[i].z = 0.0f;
recent_samples[i].temp = 0.0f;
recent_samples[i].timestamp_us = 0;
}
}
else
{
// No data available, fill with zeros
memset(recent_samples, 0, CONSUMER_PROCESS_SAMPLE_COUNT * sizeof(rt_process_sample_t));
}
xSemaphoreGive(s_circular_buffer.mutex);
// Now process the recent samples
// This is where you can add FFT, filtering, feature extraction, etc.
// Update processing statistics
uint64_t process_time_us = esp_timer_get_time() - process_start_time;
update_stats_processing(process_time_us);
// Only count actual samples read
uint32_t actual_samples = samples_to_read;
s_stats.processed_samples += actual_samples;
// Use actual_samples for processing
uint32_t samples_to_process = actual_samples > 0 ? actual_samples : CONSUMER_PROCESS_SAMPLE_COUNT;
// Acceleration detection based on recent samples
if (s_enable_accel_detection)
{
// Check if any of the recent samples meets conditions
bool condition_met = false;
for (uint32_t i = 0; i < samples_to_process; i++)
{
if ((fabsf(recent_samples[i].x) > ACCEL_THRESHOLD_X) ||
(fabsf(recent_samples[i].y) > ACCEL_THRESHOLD_Y) ||
(fabsf(recent_samples[i].z) < ACCEL_THRESHOLD_Z))
{
condition_met = true;
break;
}
}
// LCD control (same logic as before)
uint64_t current_time = esp_timer_get_time();
uint16_t target_color;
bool should_update = false;
if (condition_met)
{
target_color = RED;
if (!s_color_hold_active || s_current_lcd_color != RED)
{
s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
s_color_hold_active = true;
should_update = true;
ESP_LOGI(TAG, "LCD -> RED (buffer analysis)");
}
else
{
s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
}
}
else
{
if (s_color_hold_active && current_time < s_color_hold_until_us)
{
target_color = RED;
}
else
{
target_color = WHITE;
if (s_color_hold_active)
{
s_color_hold_active = false;
should_update = true;
ESP_LOGI(TAG, "LCD -> WHITE (hold expired)");
}
}
}
if (should_update && target_color != s_current_lcd_color)
{
// Check minimum interval and update in progress flag to avoid SPI bus conflict
uint64_t time_since_last_update = current_time - s_last_lcd_update_us;
if (!s_lcd_update_in_progress && time_since_last_update >= LCD_UPDATE_MIN_INTERVAL_US)
{
s_lcd_update_in_progress = true;
lcd_clear(target_color);
s_current_lcd_color = target_color;
s_last_lcd_update_us = current_time;
s_lcd_update_in_progress = false;
}
else
{
if (s_lcd_update_in_progress)
{
ESP_LOGD(TAG, "LCD update skipped: previous update still in progress");
}
else
{
ESP_LOGD(TAG, "LCD update skipped: too frequent (last update %llu us ago)",
time_since_last_update);
}
}
}
}
// Output analysis results
if (s_config.enable_serial)
{
// Output recent samples processing info
printf("RECENT_SAMPLES: count=%u, overwrites=%lu, process_time=%llu us\n",
(unsigned int)CONSUMER_PROCESS_SAMPLE_COUNT,
(unsigned long)overwrite_count, process_time_us);
}
// Log periodically
static uint32_t log_counter = 0;
if (++log_counter >= 20) // Log every 20 processing cycles (1 second at 50ms interval)
{
ESP_LOGI(TAG, "Recent samples processed: %u samples, overwrites=%lu, process_time=%llu us",
(unsigned int)CONSUMER_PROCESS_SAMPLE_COUNT, (unsigned long)overwrite_count, process_time_us);
log_counter = 0;
}
}
else
{
ESP_LOGW(TAG, "Failed to acquire mutex for buffer snapshot");
}
}
}
ESP_LOGI(TAG, "Consumer task stopped");
vTaskDelete(NULL);
}
/**
* @brief Update acquisition time statistics
*/
static void update_stats_acquisition(uint64_t time_us)
{
// Simple moving average
if (s_stats.total_samples == 0)
{
s_stats.avg_acquisition_time_us = (float)time_us;
}
else
{
s_stats.avg_acquisition_time_us =
(s_stats.avg_acquisition_time_us * 0.9f) + ((float)time_us * 0.1f);
}
}
/**
* @brief Update processing time statistics
*/
static void update_stats_processing(uint64_t time_us)
{
// Simple moving average
if (s_stats.processed_samples == 0)
{
s_stats.avg_process_time_us = (float)time_us;
}
else
{
s_stats.avg_process_time_us =
(s_stats.avg_process_time_us * 0.9f) + ((float)time_us * 0.1f);
}
}
/* ============================================================================
* PUBLIC FUNCTION IMPLEMENTATIONS
* ============================================================================ */
esp_err_t arch_pc_init(const rt_process_config_t *config)
{
if (s_is_initialized)
{
ESP_LOGW(TAG, "Architecture already initialized");
return ESP_ERR_INVALID_STATE;
}
// Apply configuration
if (config != NULL)
{
memcpy(&s_config, config, sizeof(rt_process_config_t));
}
else
{
// Use defaults
s_config.sampling_frequency_hz = 100.0f;
s_config.queue_size = RT_PC_QUEUE_SIZE_DEFAULT;
s_config.enable_mqtt = false; // Default: MQTT disabled
s_config.enable_serial = true; // Default: Serial enabled
s_config.enable_accel_detection = false; // Default: Detection disabled
s_config.mqtt_topic = NULL;
}
// Set acceleration detection state from config
s_enable_accel_detection = s_config.enable_accel_detection;
// Validate configuration
if (s_config.sampling_frequency_hz <= 0.0f || s_config.sampling_frequency_hz > 1000.0f)
{
ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz (valid range: 0.1 - 1000 Hz)",
s_config.sampling_frequency_hz);
return ESP_ERR_INVALID_ARG;
}
// Initialize circular buffer
s_circular_buffer.buffer_size = CIRCULAR_BUFFER_SIZE;
s_circular_buffer.write_ptr = 0; // 写头初始化为0
s_circular_buffer.read_ptr = 0; // 读头初始化为0
s_circular_buffer.overwrite_count = 0;
s_circular_buffer.total_writes = 0;
s_circular_buffer.total_reads = 0;
// Allocate circular buffer from PSRAM (for large buffer)
size_t required_bytes = CIRCULAR_BUFFER_SIZE * sizeof(rt_process_sample_t);
size_t psram_free = heap_caps_get_free_size(MALLOC_CAP_SPIRAM);
if (psram_free < required_bytes)
{
ESP_LOGE(TAG, "PSRAM insufficient: available %zu bytes, need %zu bytes",
psram_free, required_bytes);
return ESP_ERR_NO_MEM;
}
s_circular_buffer.buffer = (rt_process_sample_t *)heap_caps_malloc(
required_bytes,
MALLOC_CAP_SPIRAM // Use PSRAM for large buffer
);
if (s_circular_buffer.buffer == NULL)
{
ESP_LOGE(TAG, "Failed to allocate circular buffer from PSRAM");
ESP_LOGE(TAG, "Available PSRAM: %zu bytes", heap_caps_get_free_size(MALLOC_CAP_SPIRAM));
return ESP_ERR_NO_MEM;
}
// Initialize buffer to zero
memset(s_circular_buffer.buffer, 0, required_bytes);
// Create mutex for buffer synchronization
s_circular_buffer.mutex = xSemaphoreCreateMutex();
if (s_circular_buffer.mutex == NULL)
{
ESP_LOGE(TAG, "Failed to create mutex");
heap_caps_free(s_circular_buffer.buffer);
s_circular_buffer.buffer = NULL;
return ESP_ERR_NO_MEM;
}
ESP_LOGI(TAG, "Circular buffer allocated: %u samples (%zu bytes) from PSRAM",
CIRCULAR_BUFFER_SIZE, required_bytes);
// Initialize statistics
memset(&s_stats, 0, sizeof(rt_process_stats_t));
s_is_initialized = true;
ESP_LOGI(TAG, "Producer-Consumer architecture initialized (Circular Buffer Mode):");
ESP_LOGI(TAG, " - Sampling frequency: %.2f Hz", s_config.sampling_frequency_hz);
ESP_LOGI(TAG, " - Circular buffer size: %u samples", CIRCULAR_BUFFER_SIZE);
ESP_LOGI(TAG, " - Consumer process interval: %u ms", CONSUMER_PROCESS_INTERVAL_MS);
ESP_LOGI(TAG, " - MQTT enabled: %s", s_config.enable_mqtt ? "Yes" : "No");
ESP_LOGI(TAG, " - Serial output enabled: %s", s_config.enable_serial ? "Yes" : "No");
return ESP_OK;
}
esp_err_t arch_pc_set_sensor_handle(adxl355_handle_t *handle)
{
if (handle == NULL)
{
return ESP_ERR_INVALID_ARG;
}
s_adxl355_handle = handle;
ESP_LOGI(TAG, "Sensor handle set");
return ESP_OK;
}
esp_err_t arch_pc_start(void)
{
if (!s_is_initialized)
{
ESP_LOGE(TAG, "Architecture not initialized");
return ESP_ERR_INVALID_STATE;
}
if (s_is_running)
{
ESP_LOGW(TAG, "Architecture already running");
return ESP_ERR_INVALID_STATE;
}
if (s_adxl355_handle == NULL)
{
ESP_LOGE(TAG, "Sensor handle not set");
return ESP_ERR_INVALID_STATE;
}
// Calculate timer period
uint64_t period_us = (uint64_t)(1000000.0f / s_config.sampling_frequency_hz);
if (period_us < 100)
{
ESP_LOGE(TAG, "Sampling frequency too high: %.2f Hz", s_config.sampling_frequency_hz);
return ESP_ERR_INVALID_ARG;
}
// Create sampling timer
esp_timer_create_args_t timer_args = {
.callback = sampling_timer_callback,
.arg = NULL,
.name = "rt_pc_sampling_timer"
};
esp_err_t ret = esp_timer_create(&timer_args, &s_sampling_timer);
if (ret != ESP_OK)
{
ESP_LOGE(TAG, "Failed to create timer: %s", esp_err_to_name(ret));
return ret;
}
// Set running flag
s_is_running = true;
// Create producer task (high priority)
BaseType_t task_ret = xTaskCreate(
producer_task,
"rt_pc_producer",
RT_PC_PRODUCER_STACK_SIZE,
NULL,
RT_PC_PRODUCER_PRIORITY,
&s_producer_task_handle
);
if (task_ret != pdPASS)
{
ESP_LOGE(TAG, "Failed to create producer task");
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
s_is_running = false;
return ESP_ERR_NO_MEM;
}
// Create consumer task (medium priority)
task_ret = xTaskCreate(
consumer_task,
"rt_pc_consumer",
RT_PC_CONSUMER_STACK_SIZE,
NULL,
RT_PC_CONSUMER_PRIORITY,
&s_consumer_task_handle
);
if (task_ret != pdPASS)
{
ESP_LOGE(TAG, "Failed to create consumer task");
vTaskDelete(s_producer_task_handle);
s_producer_task_handle = NULL;
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
s_is_running = false;
return ESP_ERR_NO_MEM;
}
// Perform immediate first sample
sampling_timer_callback(NULL);
// Start periodic timer
ret = esp_timer_start_periodic(s_sampling_timer, period_us);
if (ret != ESP_OK)
{
ESP_LOGE(TAG, "Failed to start timer: %s", esp_err_to_name(ret));
vTaskDelete(s_producer_task_handle);
vTaskDelete(s_consumer_task_handle);
s_producer_task_handle = NULL;
s_consumer_task_handle = NULL;
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
s_is_running = false;
return ret;
}
ESP_LOGI(TAG, "Producer-Consumer architecture started (frequency: %.2f Hz, period: %llu us)",
s_config.sampling_frequency_hz, period_us);
return ESP_OK;
}
esp_err_t arch_pc_stop(void)
{
if (!s_is_running)
{
ESP_LOGW(TAG, "Architecture not running");
return ESP_ERR_INVALID_STATE;
}
s_is_running = false;
// Stop timer
if (s_sampling_timer != NULL)
{
esp_timer_stop(s_sampling_timer);
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
}
// Tasks will exit when s_is_running becomes false
// Wait a bit for tasks to finish
vTaskDelay(pdMS_TO_TICKS(100));
// Force delete tasks if still running
if (s_producer_task_handle != NULL)
{
vTaskDelete(s_producer_task_handle);
s_producer_task_handle = NULL;
}
if (s_consumer_task_handle != NULL)
{
vTaskDelete(s_consumer_task_handle);
s_consumer_task_handle = NULL;
}
ESP_LOGI(TAG, "Producer-Consumer architecture stopped");
return ESP_OK;
}
esp_err_t arch_pc_deinit(void)
{
if (s_is_running)
{
arch_pc_stop();
}
// Delete mutex
if (s_circular_buffer.mutex != NULL)
{
vSemaphoreDelete(s_circular_buffer.mutex);
s_circular_buffer.mutex = NULL;
}
// Free circular buffer (PSRAM)
if (s_circular_buffer.buffer != NULL)
{
heap_caps_free(s_circular_buffer.buffer);
s_circular_buffer.buffer = NULL;
}
s_circular_buffer.buffer_size = 0;
s_circular_buffer.write_ptr = 0;
s_circular_buffer.read_ptr = 0;
s_circular_buffer.overwrite_count = 0;
s_circular_buffer.total_writes = 0;
s_circular_buffer.total_reads = 0;
s_is_initialized = false;
s_adxl355_handle = NULL;
memset(&s_stats, 0, sizeof(rt_process_stats_t));
ESP_LOGI(TAG, "Producer-Consumer architecture deinitialized");
return ESP_OK;
}
esp_err_t arch_pc_get_status(rt_process_status_t *status)
{
if (status == NULL)
{
return ESP_ERR_INVALID_ARG;
}
status->is_running = s_is_running;
status->arch_type = RT_ARCH_PRODUCER_CONSUMER;
status->sampling_frequency_hz = s_config.sampling_frequency_hz;
// For circular buffer, queue_usage represents buffer fill status
if (s_circular_buffer.buffer != NULL)
{
// Calculate buffer usage based on write_ptr and read_ptr
uint32_t write_ptr = s_circular_buffer.write_ptr;
uint32_t read_ptr = s_circular_buffer.read_ptr;
if (write_ptr >= read_ptr)
{
status->queue_usage = write_ptr - read_ptr; // Normal case
}
else
{
status->queue_usage = (s_circular_buffer.buffer_size - read_ptr) + write_ptr; // Wrapped around
}
}
else
{
status->queue_usage = 0;
}
status->buffer_usage = status->queue_usage; // Same as queue_usage for circular buffer
return ESP_OK;
}
esp_err_t arch_pc_get_stats(rt_process_stats_t *stats)
{
if (stats == NULL)
{
return ESP_ERR_INVALID_ARG;
}
memcpy(stats, &s_stats, sizeof(rt_process_stats_t));
return ESP_OK;
}
esp_err_t arch_pc_set_frequency(float frequency_hz)
{
if (frequency_hz <= 0.0f || frequency_hz > 1000.0f)
{
ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz", frequency_hz);
return ESP_ERR_INVALID_ARG;
}
bool was_running = s_is_running;
if (was_running)
{
arch_pc_stop();
}
s_config.sampling_frequency_hz = frequency_hz;
ESP_LOGI(TAG, "Sampling frequency updated to %.2f Hz", frequency_hz);
if (was_running)
{
return arch_pc_start();
}
return ESP_OK;
}
esp_err_t arch_pc_is_running(bool *is_running)
{
if (is_running == NULL)
{
return ESP_ERR_INVALID_ARG;
}
*is_running = s_is_running;
return ESP_OK;
}
/**
* @brief Enable or disable acceleration detection with LCD feedback
* @param enable true to enable detection, false to disable
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_pc_set_accel_detection(bool enable)
{
s_enable_accel_detection = enable;
if (enable)
{
// Initialize LCD to white (normal state)
lcd_clear(WHITE);
s_current_lcd_color = WHITE;
s_color_hold_active = false;
s_color_hold_until_us = 0;
ESP_LOGI(TAG, "Acceleration detection enabled with LCD feedback (0.3s persistence)");
}
else
{
// Reset LCD to white when disabled
lcd_clear(WHITE);
s_current_lcd_color = WHITE;
s_color_hold_active = false;
s_color_hold_until_us = 0;
ESP_LOGI(TAG, "Acceleration detection disabled");
}
return ESP_OK;
}
Key Implementation¶
Timer Callback¶
The timer callback uses xTaskNotify to notify the producer task, avoiding blocking operations in the timer context:
static void sampling_timer_callback(void *arg)
{
if (!s_is_running || s_adxl355_handle == NULL)
{
return;
}
// Notify producer task to read sensor
if (s_producer_task_handle != NULL)
{
xTaskNotify(s_producer_task_handle, 1, eSetBits);
}
}
Producer Task¶
The producer task waits for timer notifications, reads sensor data, and writes to the circular buffer:
- Non-blocking: Uses
xTaskNotifyWaitto wait for timer notifications - Mutex Protection: Takes mutex with 10ms timeout to protect buffer access
- FIFO Overwrite: When buffer is full, advances read pointer to overwrite oldest data
- Statistics: Updates acquisition time and total sample count
Consumer Task¶
The consumer task periodically processes the circular buffer:
- Fixed Interval: Processes every 50ms
- Sample Extraction: Reads up to 10 most recent samples
- Processing: Performs feature extraction, detection, and output
- Statistics: Updates processing time and processed sample count
Circular Buffer Management¶
The circular buffer uses a mutex-protected write/read pointer mechanism:
- Write Pointer: Advanced by producer after writing
- Read Pointer: Advanced by consumer after reading
- Wrap-around: Both pointers wrap around when reaching buffer end
- Overwrite Detection: Tracks when write pointer catches read pointer
Usage Example¶
#include "real-time-process-arch.h"
// Select Architecture 1 (default)
// #define RT_PROCESS_ARCH_TYPE RT_ARCH_PRODUCER_CONSUMER
#include "real-time-process-arch.h"
// Initialize
rt_process_config_t config = {
.sampling_frequency_hz = 100.0f,
.queue_size = 50,
.enable_mqtt = false,
.enable_serial = true,
.enable_accel_detection = true,
.mqtt_topic = NULL
};
esp_err_t ret = rt_process_init(&config);
rt_process_set_sensor_handle(adxl355_handle);
rt_process_start();
// Monitor
rt_process_stats_t stats;
rt_process_get_stats(&stats);
ESP_LOGI("App", "Total: %lu, Processed: %lu, Dropped: %lu",
stats.total_samples, stats.processed_samples, stats.dropped_samples);
// Stop
rt_process_stop();
rt_process_deinit();