Skip to content

CODE

Warning

The following code should be based on the code in the release code, which may have been updated.

arch_producer_consumer.h

/**
 * @file arch_producer_consumer.h
 * @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
 * @brief Architecture 1: Producer-Consumer Queue Implementation
 * @version 1.0
 * @date 2025-01-17
 * @copyright Copyright (c) 2025
 *
 * @details
 * This architecture uses a producer-consumer pattern with circular buffer:
 * - Producer task (high priority): Reads sensor data and writes to circular buffer
 * - Consumer task (medium priority): Reads from circular buffer and processes data
 * - Circular buffer with mutex provides synchronization
 *
 * Advantages:
 * - Simple implementation
 * - Decouples acquisition and processing
 * - Suitable for medium frequency (0.1-1000 Hz, recommended: 100 Hz - 1 kHz)
 */

#pragma once

#include "arch_common.h"  // Includes rt_process_config_t
#include "node_acc_adxl355.h"
#include "esp_err.h"

#ifdef __cplusplus
extern "C"
{
#endif

/* ============================================================================
 * CONFIGURATION
 * ============================================================================ */

/**
 * @brief Default queue size for producer-consumer architecture
 * @note Uses TINY_MEASUREMENT_RT_PC_QUEUE_SIZE_DEFAULT from tiny_measurement_config.h
 */
#ifndef RT_PC_QUEUE_SIZE_DEFAULT
#define RT_PC_QUEUE_SIZE_DEFAULT TINY_MEASUREMENT_RT_PC_QUEUE_SIZE_DEFAULT
#endif

/**
 * @brief Producer task priority (high priority for timely acquisition)
 * @note Uses TINY_MEASUREMENT_RT_PC_PRODUCER_PRIORITY from tiny_measurement_config.h
 */
#ifndef RT_PC_PRODUCER_PRIORITY
#define RT_PC_PRODUCER_PRIORITY TINY_MEASUREMENT_RT_PC_PRODUCER_PRIORITY
#endif

/**
 * @brief Consumer task priority (high priority for processing, close to producer)
 * @note Uses TINY_MEASUREMENT_RT_PC_CONSUMER_PRIORITY from tiny_measurement_config.h
 */
#ifndef RT_PC_CONSUMER_PRIORITY
#define RT_PC_CONSUMER_PRIORITY TINY_MEASUREMENT_RT_PC_CONSUMER_PRIORITY
#endif

/**
 * @brief Producer task stack size
 * @note Uses TINY_MEASUREMENT_RT_PC_PRODUCER_STACK_SIZE from tiny_measurement_config.h
 */
#ifndef RT_PC_PRODUCER_STACK_SIZE
#define RT_PC_PRODUCER_STACK_SIZE TINY_MEASUREMENT_RT_PC_PRODUCER_STACK_SIZE
#endif

/**
 * @brief Consumer task stack size
 * @note Uses TINY_MEASUREMENT_RT_PC_CONSUMER_STACK_SIZE from tiny_measurement_config.h
 */
#ifndef RT_PC_CONSUMER_STACK_SIZE
#define RT_PC_CONSUMER_STACK_SIZE TINY_MEASUREMENT_RT_PC_CONSUMER_STACK_SIZE
#endif

/* ============================================================================
 * FUNCTION DECLARATIONS
 * ============================================================================ */

/**
 * @brief Initialize producer-consumer architecture
 * @param config Configuration structure
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_init(const rt_process_config_t *config);

/**
 * @brief Set sensor handle for producer-consumer architecture
 * @param handle ADXL355 sensor handle
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_set_sensor_handle(adxl355_handle_t *handle);

/**
 * @brief Start producer-consumer architecture
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_start(void);

/**
 * @brief Stop producer-consumer architecture
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_stop(void);

/**
 * @brief Deinitialize producer-consumer architecture
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_deinit(void);

/**
 * @brief Get status of producer-consumer architecture
 * @param status Output parameter for status
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_get_status(rt_process_status_t *status);

/**
 * @brief Get statistics of producer-consumer architecture
 * @param stats Output parameter for statistics
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_get_stats(rt_process_stats_t *stats);

/**
 * @brief Set sampling frequency for producer-consumer architecture
 * @param frequency_hz New sampling frequency in Hz
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_set_frequency(float frequency_hz);

/**
 * @brief Check if producer-consumer architecture is running
 * @param is_running Output parameter: true if running
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_is_running(bool *is_running);

/**
 * @brief Enable or disable acceleration detection with LCD feedback
 * @param enable true to enable detection, false to disable
 * @return ESP_OK on success, error code on failure
 * @note When enabled, LCD will show RED when conditions are met, WHITE otherwise
 * @note Conditions: |x| > 0.5g OR |y| > 0.5g OR |z| < 0.5g
 */
esp_err_t arch_pc_set_accel_detection(bool enable);

#ifdef __cplusplus
}
#endif

arch_producer_consumer.c

/**
 * @file arch_producer_consumer.c
 * @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
 * @brief Architecture 1: Producer-Consumer Queue Implementation
 * @version 1.0
 * @date 2025-01-17
 * @copyright Copyright (c) 2025
 *
 */

#include "arch_producer_consumer.h"
#include "node_mqtt.h"  // Provides s_mqtt_client, s_is_mqtt_connected, MQTT_PUBLISH_TOPIC
#include "node_lcd.h"   // LCD control for acceleration detection
#include "esp_log.h"
#include "esp_timer.h"
#include "esp_heap_caps.h"  // For PSRAM allocation
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "freertos/semphr.h"
#include <string.h>
#include <stdio.h>
#include <math.h>

/* ============================================================================
 * PRIVATE DEFINITIONS
 * ============================================================================ */

static const char *TAG = "RTArch_PC";

/* ============================================================================
 * PRIVATE VARIABLES
 * ============================================================================ */

static rt_process_config_t s_config = {
    .sampling_frequency_hz = 100.0f,
    .queue_size = RT_PC_QUEUE_SIZE_DEFAULT,
    .enable_mqtt = false,
    .enable_serial = true,
    .enable_accel_detection = false,
    .mqtt_topic = NULL
};

// Circular buffer structure
typedef struct {
    rt_process_sample_t *buffer;      // Data buffer (PSRAM)
    uint32_t buffer_size;              // Buffer size (number of samples)
    volatile uint32_t write_ptr;       // Write pointer (producer) - 写头
    volatile uint32_t read_ptr;        // Read pointer (consumer) - 读头
    SemaphoreHandle_t mutex;          // Mutex for synchronization
    uint32_t overwrite_count;          // Count of overwrites (monitoring)
    uint32_t total_writes;            // Total number of writes
    uint32_t total_reads;             // Total number of reads
} rt_circular_buffer_t;

static rt_circular_buffer_t s_circular_buffer = {0};
static TaskHandle_t s_producer_task_handle = NULL;
static TaskHandle_t s_consumer_task_handle = NULL;
static esp_timer_handle_t s_sampling_timer = NULL;
static bool s_is_running = false;
static bool s_is_initialized = false;
static adxl355_handle_t *s_adxl355_handle = NULL;

// Statistics
static rt_process_stats_t s_stats = {0};

// Acceleration detection state
static bool s_enable_accel_detection = false;  // Enable acceleration detection and LCD control
static uint16_t s_current_lcd_color = 0xFFFF; // Current LCD color (WHITE)
static uint64_t s_last_lcd_update_us = 0;
static uint64_t s_color_hold_until_us = 0;    // Time until which color should be held (for persistence)
static bool s_color_hold_active = false;      // Whether color hold is currently active
static bool s_lcd_update_in_progress = false; // Flag to prevent concurrent LCD updates

// LCD color hold duration: 0.3 seconds
#define LCD_COLOR_HOLD_DURATION_US 300000U  // 0.3 seconds (300ms) in microseconds
// LCD update minimum interval to avoid SPI bus conflict
#define LCD_UPDATE_MIN_INTERVAL_US 100000U  // Minimum 100ms between LCD updates

// Detection thresholds
#define ACCEL_THRESHOLD_X 0.5f  // Condition 1: |x| > 0.5g
#define ACCEL_THRESHOLD_Y 0.5f  // Condition 2: |y| > 0.5g
#define ACCEL_THRESHOLD_Z 0.5f  // Condition 3: |z| < 0.5g

// Circular buffer configuration
#define CIRCULAR_BUFFER_SIZE 512  // Buffer size: 512 samples
#define CONSUMER_PROCESS_INTERVAL_MS 50  // Consumer processes buffer every 50ms
#define CONSUMER_PROCESS_SAMPLE_COUNT 10  // Process last 10 samples for real-time response

/* ============================================================================
 * PRIVATE FUNCTION DECLARATIONS
 * ============================================================================ */

static void sampling_timer_callback(void *arg);
static void producer_task(void *pvParameters);
static void consumer_task(void *pvParameters);
static void update_stats_acquisition(uint64_t time_us);
static void update_stats_processing(uint64_t time_us);

/* ============================================================================
 * PRIVATE FUNCTION IMPLEMENTATIONS
 * ============================================================================ */

/**
 * @brief Timer callback for periodic sampling (triggers producer task)
 */
static void sampling_timer_callback(void *arg)
{
    if (!s_is_running || s_adxl355_handle == NULL)
    {
        return;
    }

    // Notify producer task to read sensor
    if (s_producer_task_handle != NULL)
    {
        xTaskNotify(s_producer_task_handle, 1, eSetBits);
    }
}

/**
 * @brief Producer task: Reads sensor data and enqueues
 */
static void producer_task(void *pvParameters)
{
    rt_process_sample_t sample;
    uint32_t notification_value;
    int64_t start_time;

    ESP_LOGI(TAG, "Producer task started");

    while (s_is_running)
    {
        // Wait for timer notification
        if (xTaskNotifyWait(0, ULONG_MAX, &notification_value, portMAX_DELAY) == pdTRUE)
        {
            if (!s_is_running || s_adxl355_handle == NULL)
            {
                break;
            }

            start_time = esp_timer_get_time();

            // Read sensor data
            adxl355_accelerations_t accel;
            float temperature;
            esp_err_t accel_ret = adxl355_read_accelerations(s_adxl355_handle, &accel);
            esp_err_t temp_ret = adxl355_read_temperature(s_adxl355_handle, &temperature);

            if (accel_ret == ESP_OK && temp_ret == ESP_OK)
            {
                // Prepare sample
                sample.x = accel.x;
                sample.y = accel.y;
                sample.z = accel.z;
                sample.temp = temperature;
                sample.timestamp_us = esp_timer_get_time();

                // Update acquisition statistics
                update_stats_acquisition(esp_timer_get_time() - start_time);

                // Write to circular buffer (FIFO overwrite)
                if (s_circular_buffer.buffer != NULL && s_circular_buffer.mutex != NULL)
                {
                    // Take mutex to protect buffer access
                    if (xSemaphoreTake(s_circular_buffer.mutex, pdMS_TO_TICKS(10)) == pdTRUE)
                    {
                        // Write sample to current write position
                        uint32_t write_idx = s_circular_buffer.write_ptr;
                        s_circular_buffer.buffer[write_idx] = sample;

                        // Update write pointer (circular)
                        uint32_t next_write_ptr = (write_idx + 1) % s_circular_buffer.buffer_size;

                        // Check if write pointer will catch up with read pointer (buffer full)
                        if (next_write_ptr == s_circular_buffer.read_ptr && s_circular_buffer.total_writes >= s_circular_buffer.buffer_size)
                        {
                            // Buffer full: advance read pointer (FIFO overwrite)
                            s_circular_buffer.read_ptr = (s_circular_buffer.read_ptr + 1) % s_circular_buffer.buffer_size;
                            s_circular_buffer.overwrite_count++;
                        }

                        s_circular_buffer.write_ptr = next_write_ptr;
                        s_circular_buffer.total_writes++;

                        xSemaphoreGive(s_circular_buffer.mutex);
                        s_stats.total_samples++;
                    }
                    else
                    {
                        // Mutex timeout - drop sample
                        s_stats.dropped_samples++;
                        ESP_LOGW(TAG, "Mutex timeout, sample dropped");
                    }
                }
                else
                {
                    s_stats.dropped_samples++;
                    ESP_LOGW(TAG, "Circular buffer not initialized, sample dropped");
                }
            }
            else
            {
                ESP_LOGE(TAG, "Failed to read sensor: accel=%s, temp=%s",
                         esp_err_to_name(accel_ret), esp_err_to_name(temp_ret));
            }
        }
    }

    ESP_LOGI(TAG, "Producer task stopped");
    vTaskDelete(NULL);
}

/**
 * @brief Consumer task: Periodically extracts entire buffer snapshot for analysis
 */
static void consumer_task(void *pvParameters)
{
    // Local buffer to store last N samples (fast access)
    rt_process_sample_t recent_samples[CONSUMER_PROCESS_SAMPLE_COUNT];

    ESP_LOGI(TAG, "Consumer task started (circular buffer mode, processing last %d samples)",
             CONSUMER_PROCESS_SAMPLE_COUNT);

    while (s_is_running)
    {
        // Wait for processing interval
        vTaskDelay(pdMS_TO_TICKS(CONSUMER_PROCESS_INTERVAL_MS));

        if (!s_is_running)
        {
            break;
        }

        // Extract entire buffer snapshot
        if (s_circular_buffer.buffer != NULL && s_circular_buffer.mutex != NULL)
        {
            int64_t process_start_time = esp_timer_get_time();

            // Take mutex to protect buffer access
            if (xSemaphoreTake(s_circular_buffer.mutex, pdMS_TO_TICKS(50)) == pdTRUE)
            {
                // Standard circular buffer read: read from read_ptr
                uint32_t write_ptr = s_circular_buffer.write_ptr;
                uint32_t read_ptr = s_circular_buffer.read_ptr;
                uint32_t overwrite_count = s_circular_buffer.overwrite_count;

                // Calculate available samples (distance between read_ptr and write_ptr)
                uint32_t available_samples;
                if (write_ptr >= read_ptr)
                {
                    available_samples = write_ptr - read_ptr;  // Normal case
                }
                else
                {
                    available_samples = (s_circular_buffer.buffer_size - read_ptr) + write_ptr;  // Wrapped around
                }

                // Read samples from read pointer (up to CONSUMER_PROCESS_SAMPLE_COUNT)
                uint32_t samples_to_read = (available_samples > CONSUMER_PROCESS_SAMPLE_COUNT) ? 
                                           CONSUMER_PROCESS_SAMPLE_COUNT : available_samples;

                if (samples_to_read > 0)
                {
                    // Copy samples starting from read pointer
                    for (uint32_t i = 0; i < samples_to_read; i++)
                    {
                        uint32_t src_idx = (read_ptr + i) % s_circular_buffer.buffer_size;
                        recent_samples[i] = s_circular_buffer.buffer[src_idx];
                    }

                    // Update read pointer (advance read head)
                    s_circular_buffer.read_ptr = (read_ptr + samples_to_read) % s_circular_buffer.buffer_size;
                    s_circular_buffer.total_reads += samples_to_read;

                    // Fill remaining slots with zeros if not enough samples
                    for (uint32_t i = samples_to_read; i < CONSUMER_PROCESS_SAMPLE_COUNT; i++)
                    {
                        recent_samples[i].x = 0.0f;
                        recent_samples[i].y = 0.0f;
                        recent_samples[i].z = 0.0f;
                        recent_samples[i].temp = 0.0f;
                        recent_samples[i].timestamp_us = 0;
                    }
                }
                else
                {
                    // No data available, fill with zeros
                    memset(recent_samples, 0, CONSUMER_PROCESS_SAMPLE_COUNT * sizeof(rt_process_sample_t));
                }

                xSemaphoreGive(s_circular_buffer.mutex);

                // Now process the recent samples
                // This is where you can add FFT, filtering, feature extraction, etc.

                // Update processing statistics
                uint64_t process_time_us = esp_timer_get_time() - process_start_time;
                update_stats_processing(process_time_us);
                // Only count actual samples read
                uint32_t actual_samples = samples_to_read;
                s_stats.processed_samples += actual_samples;

                // Use actual_samples for processing
                uint32_t samples_to_process = actual_samples > 0 ? actual_samples : CONSUMER_PROCESS_SAMPLE_COUNT;

                // Acceleration detection based on recent samples
                if (s_enable_accel_detection)
                {
                    // Check if any of the recent samples meets conditions
                    bool condition_met = false;
                    for (uint32_t i = 0; i < samples_to_process; i++)
                    {
                        if ((fabsf(recent_samples[i].x) > ACCEL_THRESHOLD_X) ||
                            (fabsf(recent_samples[i].y) > ACCEL_THRESHOLD_Y) ||
                            (fabsf(recent_samples[i].z) < ACCEL_THRESHOLD_Z))
                        {
                            condition_met = true;
                            break;
                        }
                    }

                    // LCD control (same logic as before)
                    uint64_t current_time = esp_timer_get_time();
                    uint16_t target_color;
                    bool should_update = false;

                    if (condition_met)
                    {
                        target_color = RED;
                        if (!s_color_hold_active || s_current_lcd_color != RED)
                        {
                            s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
                            s_color_hold_active = true;
                            should_update = true;
                            ESP_LOGI(TAG, "LCD -> RED (buffer analysis)");
                        }
                        else
                        {
                            s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
                        }
                    }
                    else
                    {
                        if (s_color_hold_active && current_time < s_color_hold_until_us)
                        {
                            target_color = RED;
                        }
                        else
                        {
                            target_color = WHITE;
                            if (s_color_hold_active)
                            {
                                s_color_hold_active = false;
                                should_update = true;
                                ESP_LOGI(TAG, "LCD -> WHITE (hold expired)");
                            }
                        }
                    }

                    if (should_update && target_color != s_current_lcd_color)
                    {
                        // Check minimum interval and update in progress flag to avoid SPI bus conflict
                        uint64_t time_since_last_update = current_time - s_last_lcd_update_us;
                        if (!s_lcd_update_in_progress && time_since_last_update >= LCD_UPDATE_MIN_INTERVAL_US)
                        {
                            s_lcd_update_in_progress = true;
                            lcd_clear(target_color);
                            s_current_lcd_color = target_color;
                            s_last_lcd_update_us = current_time;
                            s_lcd_update_in_progress = false;
                        }
                        else
                        {
                            if (s_lcd_update_in_progress)
                            {
                                ESP_LOGD(TAG, "LCD update skipped: previous update still in progress");
                            }
                            else
                            {
                                ESP_LOGD(TAG, "LCD update skipped: too frequent (last update %llu us ago)",
                                         time_since_last_update);
                            }
                        }
                    }
                }

                // Output analysis results
                if (s_config.enable_serial)
                {
                    // Output recent samples processing info
                    printf("RECENT_SAMPLES: count=%u, overwrites=%lu, process_time=%llu us\n",
                           (unsigned int)CONSUMER_PROCESS_SAMPLE_COUNT,
                           (unsigned long)overwrite_count, process_time_us);
                }

                // Log periodically
                static uint32_t log_counter = 0;
                if (++log_counter >= 20)  // Log every 20 processing cycles (1 second at 50ms interval)
                {
                    ESP_LOGI(TAG, "Recent samples processed: %u samples, overwrites=%lu, process_time=%llu us",
                             (unsigned int)CONSUMER_PROCESS_SAMPLE_COUNT, (unsigned long)overwrite_count, process_time_us);
                    log_counter = 0;
                }
            }
            else
            {
                ESP_LOGW(TAG, "Failed to acquire mutex for buffer snapshot");
            }
        }
    }

    ESP_LOGI(TAG, "Consumer task stopped");
    vTaskDelete(NULL);
}

/**
 * @brief Update acquisition time statistics
 */
static void update_stats_acquisition(uint64_t time_us)
{
    // Simple moving average
    if (s_stats.total_samples == 0)
    {
        s_stats.avg_acquisition_time_us = (float)time_us;
    }
    else
    {
        s_stats.avg_acquisition_time_us = 
            (s_stats.avg_acquisition_time_us * 0.9f) + ((float)time_us * 0.1f);
    }
}

/**
 * @brief Update processing time statistics
 */
static void update_stats_processing(uint64_t time_us)
{
    // Simple moving average
    if (s_stats.processed_samples == 0)
    {
        s_stats.avg_process_time_us = (float)time_us;
    }
    else
    {
        s_stats.avg_process_time_us = 
            (s_stats.avg_process_time_us * 0.9f) + ((float)time_us * 0.1f);
    }
}

/* ============================================================================
 * PUBLIC FUNCTION IMPLEMENTATIONS
 * ============================================================================ */

esp_err_t arch_pc_init(const rt_process_config_t *config)
{
    if (s_is_initialized)
    {
        ESP_LOGW(TAG, "Architecture already initialized");
        return ESP_ERR_INVALID_STATE;
    }

    // Apply configuration
    if (config != NULL)
    {
        memcpy(&s_config, config, sizeof(rt_process_config_t));
    }
    else
    {
        // Use defaults
        s_config.sampling_frequency_hz = 100.0f;
        s_config.queue_size = RT_PC_QUEUE_SIZE_DEFAULT;
        s_config.enable_mqtt = false;  // Default: MQTT disabled
        s_config.enable_serial = true;  // Default: Serial enabled
        s_config.enable_accel_detection = false;  // Default: Detection disabled
        s_config.mqtt_topic = NULL;
    }

    // Set acceleration detection state from config
    s_enable_accel_detection = s_config.enable_accel_detection;

    // Validate configuration
    if (s_config.sampling_frequency_hz <= 0.0f || s_config.sampling_frequency_hz > 1000.0f)
    {
        ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz (valid range: 0.1 - 1000 Hz)",
                 s_config.sampling_frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    // Initialize circular buffer
    s_circular_buffer.buffer_size = CIRCULAR_BUFFER_SIZE;
    s_circular_buffer.write_ptr = 0;   // 写头初始化为0
    s_circular_buffer.read_ptr = 0;    // 读头初始化为0
    s_circular_buffer.overwrite_count = 0;
    s_circular_buffer.total_writes = 0;
    s_circular_buffer.total_reads = 0;

    // Allocate circular buffer from PSRAM (for large buffer)
    size_t required_bytes = CIRCULAR_BUFFER_SIZE * sizeof(rt_process_sample_t);
    size_t psram_free = heap_caps_get_free_size(MALLOC_CAP_SPIRAM);

    if (psram_free < required_bytes)
    {
        ESP_LOGE(TAG, "PSRAM insufficient: available %zu bytes, need %zu bytes",
                 psram_free, required_bytes);
        return ESP_ERR_NO_MEM;
    }

    s_circular_buffer.buffer = (rt_process_sample_t *)heap_caps_malloc(
        required_bytes,
        MALLOC_CAP_SPIRAM  // Use PSRAM for large buffer
    );

    if (s_circular_buffer.buffer == NULL)
    {
        ESP_LOGE(TAG, "Failed to allocate circular buffer from PSRAM");
        ESP_LOGE(TAG, "Available PSRAM: %zu bytes", heap_caps_get_free_size(MALLOC_CAP_SPIRAM));
        return ESP_ERR_NO_MEM;
    }

    // Initialize buffer to zero
    memset(s_circular_buffer.buffer, 0, required_bytes);

    // Create mutex for buffer synchronization
    s_circular_buffer.mutex = xSemaphoreCreateMutex();
    if (s_circular_buffer.mutex == NULL)
    {
        ESP_LOGE(TAG, "Failed to create mutex");
        heap_caps_free(s_circular_buffer.buffer);
        s_circular_buffer.buffer = NULL;
        return ESP_ERR_NO_MEM;
    }

    ESP_LOGI(TAG, "Circular buffer allocated: %u samples (%zu bytes) from PSRAM",
             CIRCULAR_BUFFER_SIZE, required_bytes);

    // Initialize statistics
    memset(&s_stats, 0, sizeof(rt_process_stats_t));

    s_is_initialized = true;

    ESP_LOGI(TAG, "Producer-Consumer architecture initialized (Circular Buffer Mode):");
    ESP_LOGI(TAG, "  - Sampling frequency: %.2f Hz", s_config.sampling_frequency_hz);
    ESP_LOGI(TAG, "  - Circular buffer size: %u samples", CIRCULAR_BUFFER_SIZE);
    ESP_LOGI(TAG, "  - Consumer process interval: %u ms", CONSUMER_PROCESS_INTERVAL_MS);
    ESP_LOGI(TAG, "  - MQTT enabled: %s", s_config.enable_mqtt ? "Yes" : "No");
    ESP_LOGI(TAG, "  - Serial output enabled: %s", s_config.enable_serial ? "Yes" : "No");

    return ESP_OK;
}

esp_err_t arch_pc_set_sensor_handle(adxl355_handle_t *handle)
{
    if (handle == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    s_adxl355_handle = handle;
    ESP_LOGI(TAG, "Sensor handle set");
    return ESP_OK;
}

esp_err_t arch_pc_start(void)
{
    if (!s_is_initialized)
    {
        ESP_LOGE(TAG, "Architecture not initialized");
        return ESP_ERR_INVALID_STATE;
    }

    if (s_is_running)
    {
        ESP_LOGW(TAG, "Architecture already running");
        return ESP_ERR_INVALID_STATE;
    }

    if (s_adxl355_handle == NULL)
    {
        ESP_LOGE(TAG, "Sensor handle not set");
        return ESP_ERR_INVALID_STATE;
    }

    // Calculate timer period
    uint64_t period_us = (uint64_t)(1000000.0f / s_config.sampling_frequency_hz);
    if (period_us < 100)
    {
        ESP_LOGE(TAG, "Sampling frequency too high: %.2f Hz", s_config.sampling_frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    // Create sampling timer
    esp_timer_create_args_t timer_args = {
        .callback = sampling_timer_callback,
        .arg = NULL,
        .name = "rt_pc_sampling_timer"
    };

    esp_err_t ret = esp_timer_create(&timer_args, &s_sampling_timer);
    if (ret != ESP_OK)
    {
        ESP_LOGE(TAG, "Failed to create timer: %s", esp_err_to_name(ret));
        return ret;
    }

    // Set running flag
    s_is_running = true;

    // Create producer task (high priority)
    BaseType_t task_ret = xTaskCreate(
        producer_task,
        "rt_pc_producer",
        RT_PC_PRODUCER_STACK_SIZE,
        NULL,
        RT_PC_PRODUCER_PRIORITY,
        &s_producer_task_handle
    );

    if (task_ret != pdPASS)
    {
        ESP_LOGE(TAG, "Failed to create producer task");
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ESP_ERR_NO_MEM;
    }

    // Create consumer task (medium priority)
    task_ret = xTaskCreate(
        consumer_task,
        "rt_pc_consumer",
        RT_PC_CONSUMER_STACK_SIZE,
        NULL,
        RT_PC_CONSUMER_PRIORITY,
        &s_consumer_task_handle
    );

    if (task_ret != pdPASS)
    {
        ESP_LOGE(TAG, "Failed to create consumer task");
        vTaskDelete(s_producer_task_handle);
        s_producer_task_handle = NULL;
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ESP_ERR_NO_MEM;
    }

    // Perform immediate first sample
    sampling_timer_callback(NULL);

    // Start periodic timer
    ret = esp_timer_start_periodic(s_sampling_timer, period_us);
    if (ret != ESP_OK)
    {
        ESP_LOGE(TAG, "Failed to start timer: %s", esp_err_to_name(ret));
        vTaskDelete(s_producer_task_handle);
        vTaskDelete(s_consumer_task_handle);
        s_producer_task_handle = NULL;
        s_consumer_task_handle = NULL;
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ret;
    }

    ESP_LOGI(TAG, "Producer-Consumer architecture started (frequency: %.2f Hz, period: %llu us)",
             s_config.sampling_frequency_hz, period_us);

    return ESP_OK;
}

esp_err_t arch_pc_stop(void)
{
    if (!s_is_running)
    {
        ESP_LOGW(TAG, "Architecture not running");
        return ESP_ERR_INVALID_STATE;
    }

    s_is_running = false;

    // Stop timer
    if (s_sampling_timer != NULL)
    {
        esp_timer_stop(s_sampling_timer);
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
    }

    // Tasks will exit when s_is_running becomes false
    // Wait a bit for tasks to finish
    vTaskDelay(pdMS_TO_TICKS(100));

    // Force delete tasks if still running
    if (s_producer_task_handle != NULL)
    {
        vTaskDelete(s_producer_task_handle);
        s_producer_task_handle = NULL;
    }

    if (s_consumer_task_handle != NULL)
    {
        vTaskDelete(s_consumer_task_handle);
        s_consumer_task_handle = NULL;
    }

    ESP_LOGI(TAG, "Producer-Consumer architecture stopped");
    return ESP_OK;
}

esp_err_t arch_pc_deinit(void)
{
    if (s_is_running)
    {
        arch_pc_stop();
    }

    // Delete mutex
    if (s_circular_buffer.mutex != NULL)
    {
        vSemaphoreDelete(s_circular_buffer.mutex);
        s_circular_buffer.mutex = NULL;
    }

    // Free circular buffer (PSRAM)
    if (s_circular_buffer.buffer != NULL)
    {
        heap_caps_free(s_circular_buffer.buffer);
        s_circular_buffer.buffer = NULL;
    }

    s_circular_buffer.buffer_size = 0;
    s_circular_buffer.write_ptr = 0;
    s_circular_buffer.read_ptr = 0;
    s_circular_buffer.overwrite_count = 0;
    s_circular_buffer.total_writes = 0;
    s_circular_buffer.total_reads = 0;

    s_is_initialized = false;
    s_adxl355_handle = NULL;
    memset(&s_stats, 0, sizeof(rt_process_stats_t));

    ESP_LOGI(TAG, "Producer-Consumer architecture deinitialized");
    return ESP_OK;
}

esp_err_t arch_pc_get_status(rt_process_status_t *status)
{
    if (status == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    status->is_running = s_is_running;
    status->arch_type = RT_ARCH_PRODUCER_CONSUMER;
    status->sampling_frequency_hz = s_config.sampling_frequency_hz;

    // For circular buffer, queue_usage represents buffer fill status
    if (s_circular_buffer.buffer != NULL)
    {
        // Calculate buffer usage based on write_ptr and read_ptr
        uint32_t write_ptr = s_circular_buffer.write_ptr;
        uint32_t read_ptr = s_circular_buffer.read_ptr;

        if (write_ptr >= read_ptr)
        {
            status->queue_usage = write_ptr - read_ptr;  // Normal case
        }
        else
        {
            status->queue_usage = (s_circular_buffer.buffer_size - read_ptr) + write_ptr;  // Wrapped around
        }
    }
    else
    {
        status->queue_usage = 0;
    }

    status->buffer_usage = status->queue_usage;  // Same as queue_usage for circular buffer

    return ESP_OK;
}

esp_err_t arch_pc_get_stats(rt_process_stats_t *stats)
{
    if (stats == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    memcpy(stats, &s_stats, sizeof(rt_process_stats_t));
    return ESP_OK;
}

esp_err_t arch_pc_set_frequency(float frequency_hz)
{
    if (frequency_hz <= 0.0f || frequency_hz > 1000.0f)
    {
        ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz", frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    bool was_running = s_is_running;
    if (was_running)
    {
        arch_pc_stop();
    }

    s_config.sampling_frequency_hz = frequency_hz;
    ESP_LOGI(TAG, "Sampling frequency updated to %.2f Hz", frequency_hz);

    if (was_running)
    {
        return arch_pc_start();
    }

    return ESP_OK;
}

esp_err_t arch_pc_is_running(bool *is_running)
{
    if (is_running == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    *is_running = s_is_running;
    return ESP_OK;
}

/**
 * @brief Enable or disable acceleration detection with LCD feedback
 * @param enable true to enable detection, false to disable
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_set_accel_detection(bool enable)
{
    s_enable_accel_detection = enable;

    if (enable)
    {
        // Initialize LCD to white (normal state)
        lcd_clear(WHITE);
        s_current_lcd_color = WHITE;
        s_color_hold_active = false;
        s_color_hold_until_us = 0;
        ESP_LOGI(TAG, "Acceleration detection enabled with LCD feedback (0.3s persistence)");
    }
    else
    {
        // Reset LCD to white when disabled
        lcd_clear(WHITE);
        s_current_lcd_color = WHITE;
        s_color_hold_active = false;
        s_color_hold_until_us = 0;
        ESP_LOGI(TAG, "Acceleration detection disabled");
    }

    return ESP_OK;
}

Key Implementation

Timer Callback

The timer callback uses xTaskNotify to notify the producer task, avoiding blocking operations in the timer context:

static void sampling_timer_callback(void *arg)
{
    if (!s_is_running || s_adxl355_handle == NULL)
    {
        return;
    }

    // Notify producer task to read sensor
    if (s_producer_task_handle != NULL)
    {
        xTaskNotify(s_producer_task_handle, 1, eSetBits);
    }
}

Producer Task

The producer task waits for timer notifications, reads sensor data, and writes to the circular buffer:

  • Non-blocking: Uses xTaskNotifyWait to wait for timer notifications
  • Mutex Protection: Takes mutex with 10ms timeout to protect buffer access
  • FIFO Overwrite: When buffer is full, advances read pointer to overwrite oldest data
  • Statistics: Updates acquisition time and total sample count

Consumer Task

The consumer task periodically processes the circular buffer:

  • Fixed Interval: Processes every 50ms
  • Sample Extraction: Reads up to 10 most recent samples
  • Processing: Performs feature extraction, detection, and output
  • Statistics: Updates processing time and processed sample count

Circular Buffer Management

The circular buffer uses a mutex-protected write/read pointer mechanism:

  • Write Pointer: Advanced by producer after writing
  • Read Pointer: Advanced by consumer after reading
  • Wrap-around: Both pointers wrap around when reaching buffer end
  • Overwrite Detection: Tracks when write pointer catches read pointer

Usage Example

#include "real-time-process-arch.h"

// Select Architecture 1 (default)
// #define RT_PROCESS_ARCH_TYPE RT_ARCH_PRODUCER_CONSUMER
#include "real-time-process-arch.h"

// Initialize
rt_process_config_t config = {
    .sampling_frequency_hz = 100.0f,
    .queue_size = 50,
    .enable_mqtt = false,
    .enable_serial = true,
    .enable_accel_detection = true,
    .mqtt_topic = NULL
};

esp_err_t ret = rt_process_init(&config);
rt_process_set_sensor_handle(adxl355_handle);
rt_process_start();

// Monitor
rt_process_stats_t stats;
rt_process_get_stats(&stats);
ESP_LOGI("App", "Total: %lu, Processed: %lu, Dropped: %lu",
         stats.total_samples, stats.processed_samples, stats.dropped_samples);

// Stop
rt_process_stop();
rt_process_deinit();