跳转至

代码

Warning

以下代码应基于发布代码中的代码,可能已更新。

arch_producer_consumer.h

/**
 * @file arch_producer_consumer.h
 * @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
 * @brief 架构1:Producer-Consumer 队列实现
 * @version 1.0
 * @date 2025-01-17
 * @copyright Copyright (c) 2025
 *
 * @details
 * 本架构使用 Producer-Consumer 模式和环形缓冲区:
 * - Producer 任务(高优先级):读取传感器数据并写入环形缓冲区
 * - Consumer 任务(中等优先级):从环形缓冲区读取并处理数据
 * - 环形缓冲区配合互斥锁提供同步
 *
 * 优势:
 * - 实现简单
 * - 解耦采集和处理
 * - 适用于中等频率(0.1-1000 Hz,推荐:100 Hz - 1 kHz)
 */

#pragma once

#include "arch_common.h"  // Includes rt_process_config_t
#include "node_acc_adxl355.h"
#include "esp_err.h"

#ifdef __cplusplus
extern "C"
{
#endif

/* ============================================================================
 * CONFIGURATION
 * ============================================================================ */

/**
 * @brief Producer-Consumer 架构的默认队列大小
 */
#ifndef RT_PC_QUEUE_SIZE_DEFAULT
#define RT_PC_QUEUE_SIZE_DEFAULT 50
#endif

/**
 * @brief Producer 任务优先级(高优先级,用于及时采集)
 */
#ifndef RT_PC_PRODUCER_PRIORITY
#define RT_PC_PRODUCER_PRIORITY 10
#endif

/**
 * @brief Consumer 任务优先级(高优先级用于处理,接近 Producer)
 * @note 设置为 8 以确保快速处理,同时仍低于 Producer(10)
 */
#ifndef RT_PC_CONSUMER_PRIORITY
#define RT_PC_CONSUMER_PRIORITY 8
#endif

/**
 * @brief Producer 任务堆栈大小
 */
#ifndef RT_PC_PRODUCER_STACK_SIZE
#define RT_PC_PRODUCER_STACK_SIZE 4096
#endif

/**
 * @brief Consumer 任务堆栈大小
 */
#ifndef RT_PC_CONSUMER_STACK_SIZE
#define RT_PC_CONSUMER_STACK_SIZE 8192
#endif

/* ============================================================================
 * FUNCTION DECLARATIONS
 * ============================================================================ */

/**
 * @brief 初始化 Producer-Consumer 架构
 * @param config 配置结构
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_init(const rt_process_config_t *config);

/**
 * @brief 为 Producer-Consumer 架构设置传感器句柄
 * @param handle ADXL355 传感器句柄
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_set_sensor_handle(adxl355_handle_t *handle);

/**
 * @brief 启动 Producer-Consumer 架构
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_start(void);

/**
 * @brief 停止 Producer-Consumer 架构
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_stop(void);

/**
 * @brief 反初始化 Producer-Consumer 架构
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_deinit(void);

/**
 * @brief 获取 Producer-Consumer 架构的状态
 * @param status 状态输出参数
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_get_status(rt_process_status_t *status);

/**
 * @brief 获取 Producer-Consumer 架构的统计信息
 * @param stats 统计输出参数
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_get_stats(rt_process_stats_t *stats);

/**
 * @brief 为 Producer-Consumer 架构设置采样频率
 * @param frequency_hz 新的采样频率(Hz)
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_set_frequency(float frequency_hz);

/**
 * @brief 检查 Producer-Consumer 架构是否正在运行
 * @param is_running 输出参数:true 表示正在运行
 * @return 成功返回 ESP_OK,失败返回错误代码
 */
esp_err_t arch_pc_is_running(bool *is_running);

/**
 * @brief 启用或禁用加速度检测和 LCD 反馈
 * @param enable true 启用检测,false 禁用
 * @return 成功返回 ESP_OK,失败返回错误代码
 * @note 启用时,LCD 在满足条件时显示红色,否则显示白色
 * @note 条件:|x| > 0.5g OR |y| > 0.5g OR |z| < 0.5g
 */
esp_err_t arch_pc_set_accel_detection(bool enable);

#ifdef __cplusplus
}
#endif

arch_producer_consumer.c

/**
 * @file arch_producer_consumer.c
 * @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
 * @brief Architecture 1: Producer-Consumer Queue Implementation
 * @version 1.0
 * @date 2025-01-17
 * @copyright Copyright (c) 2025
 *
 */

#include "arch_producer_consumer.h"
#include "node_mqtt.h"  // Provides s_mqtt_client, s_is_mqtt_connected, MQTT_PUBLISH_TOPIC
#include "node_lcd.h"   // LCD control for acceleration detection
#include "esp_log.h"
#include "esp_timer.h"
#include "esp_heap_caps.h"  // For PSRAM allocation
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "freertos/semphr.h"
#include <string.h>
#include <stdio.h>
#include <math.h>

/* ============================================================================
 * PRIVATE DEFINITIONS
 * ============================================================================ */

static const char *TAG = "RTArch_PC";

/* ============================================================================
 * PRIVATE VARIABLES
 * ============================================================================ */

static rt_process_config_t s_config = {
    .sampling_frequency_hz = 100.0f,
    .queue_size = RT_PC_QUEUE_SIZE_DEFAULT,
    .enable_mqtt = false,
    .enable_serial = true,
    .enable_accel_detection = false,
    .mqtt_topic = NULL
};

// Circular buffer structure
typedef struct {
    rt_process_sample_t *buffer;      // Data buffer (PSRAM)
    uint32_t buffer_size;              // Buffer size (number of samples)
    volatile uint32_t write_ptr;       // Write pointer (producer) - 写头
    volatile uint32_t read_ptr;        // Read pointer (consumer) - 读头
    SemaphoreHandle_t mutex;          // Mutex for synchronization
    uint32_t overwrite_count;          // Count of overwrites (monitoring)
    uint32_t total_writes;            // Total number of writes
    uint32_t total_reads;             // Total number of reads
} rt_circular_buffer_t;

static rt_circular_buffer_t s_circular_buffer = {0};
static TaskHandle_t s_producer_task_handle = NULL;
static TaskHandle_t s_consumer_task_handle = NULL;
static esp_timer_handle_t s_sampling_timer = NULL;
static bool s_is_running = false;
static bool s_is_initialized = false;
static adxl355_handle_t *s_adxl355_handle = NULL;

// Statistics
static rt_process_stats_t s_stats = {0};

// Acceleration detection state
static bool s_enable_accel_detection = false;  // Enable acceleration detection and LCD control
static uint16_t s_current_lcd_color = 0xFFFF; // Current LCD color (WHITE)
static uint64_t s_last_lcd_update_us = 0;
static uint64_t s_color_hold_until_us = 0;    // Time until which color should be held (for persistence)
static bool s_color_hold_active = false;      // Whether color hold is currently active
static bool s_lcd_update_in_progress = false; // Flag to prevent concurrent LCD updates

// LCD color hold duration: 0.3 seconds
#define LCD_COLOR_HOLD_DURATION_US 300000U  // 0.3 seconds (300ms) in microseconds
// LCD update minimum interval to avoid SPI bus conflict
#define LCD_UPDATE_MIN_INTERVAL_US 100000U  // Minimum 100ms between LCD updates

// Detection thresholds
#define ACCEL_THRESHOLD_X 0.5f  // Condition 1: |x| > 0.5g
#define ACCEL_THRESHOLD_Y 0.5f  // Condition 2: |y| > 0.5g
#define ACCEL_THRESHOLD_Z 0.5f  // Condition 3: |z| < 0.5g

// Circular buffer configuration
#define CIRCULAR_BUFFER_SIZE 512  // Buffer size: 512 samples
#define CONSUMER_PROCESS_INTERVAL_MS 50  // Consumer processes buffer every 50ms
#define CONSUMER_PROCESS_SAMPLE_COUNT 10  // Process last 10 samples for real-time response

/* ============================================================================
 * PRIVATE FUNCTION DECLARATIONS
 * ============================================================================ */

static void sampling_timer_callback(void *arg);
static void producer_task(void *pvParameters);
static void consumer_task(void *pvParameters);
static void update_stats_acquisition(uint64_t time_us);
static void update_stats_processing(uint64_t time_us);

/* ============================================================================
 * PRIVATE FUNCTION IMPLEMENTATIONS
 * ============================================================================ */

/**
 * @brief Timer callback for periodic sampling (triggers producer task)
 */
static void sampling_timer_callback(void *arg)
{
    if (!s_is_running || s_adxl355_handle == NULL)
    {
        return;
    }

    // Notify producer task to read sensor
    if (s_producer_task_handle != NULL)
    {
        xTaskNotify(s_producer_task_handle, 1, eSetBits);
    }
}

/**
 * @brief Producer task: Reads sensor data and enqueues
 */
static void producer_task(void *pvParameters)
{
    rt_process_sample_t sample;
    uint32_t notification_value;
    int64_t start_time;

    ESP_LOGI(TAG, "Producer task started");

    while (s_is_running)
    {
        // Wait for timer notification
        if (xTaskNotifyWait(0, ULONG_MAX, &notification_value, portMAX_DELAY) == pdTRUE)
        {
            if (!s_is_running || s_adxl355_handle == NULL)
            {
                break;
            }

            start_time = esp_timer_get_time();

            // Read sensor data
            adxl355_accelerations_t accel;
            float temperature;
            esp_err_t accel_ret = adxl355_read_accelerations(s_adxl355_handle, &accel);
            esp_err_t temp_ret = adxl355_read_temperature(s_adxl355_handle, &temperature);

            if (accel_ret == ESP_OK && temp_ret == ESP_OK)
            {
                // Prepare sample
                sample.x = accel.x;
                sample.y = accel.y;
                sample.z = accel.z;
                sample.temp = temperature;
                sample.timestamp_us = esp_timer_get_time();

                // Update acquisition statistics
                update_stats_acquisition(esp_timer_get_time() - start_time);

                // Write to circular buffer (FIFO overwrite)
                if (s_circular_buffer.buffer != NULL && s_circular_buffer.mutex != NULL)
                {
                    // Take mutex to protect buffer access
                    if (xSemaphoreTake(s_circular_buffer.mutex, pdMS_TO_TICKS(10)) == pdTRUE)
                    {
                        // Write sample to current write position
                        uint32_t write_idx = s_circular_buffer.write_ptr;
                        s_circular_buffer.buffer[write_idx] = sample;

                        // Update write pointer (circular)
                        uint32_t next_write_ptr = (write_idx + 1) % s_circular_buffer.buffer_size;

                        // Check if write pointer will catch up with read pointer (buffer full)
                        if (next_write_ptr == s_circular_buffer.read_ptr && s_circular_buffer.total_writes >= s_circular_buffer.buffer_size)
                        {
                            // Buffer full: advance read pointer (FIFO overwrite)
                            s_circular_buffer.read_ptr = (s_circular_buffer.read_ptr + 1) % s_circular_buffer.buffer_size;
                            s_circular_buffer.overwrite_count++;
                        }

                        s_circular_buffer.write_ptr = next_write_ptr;
                        s_circular_buffer.total_writes++;

                        xSemaphoreGive(s_circular_buffer.mutex);
                        s_stats.total_samples++;
                    }
                    else
                    {
                        // Mutex timeout - drop sample
                        s_stats.dropped_samples++;
                        ESP_LOGW(TAG, "Mutex timeout, sample dropped");
                    }
                }
                else
                {
                    s_stats.dropped_samples++;
                    ESP_LOGW(TAG, "Circular buffer not initialized, sample dropped");
                }
            }
            else
            {
                ESP_LOGE(TAG, "Failed to read sensor: accel=%s, temp=%s",
                         esp_err_to_name(accel_ret), esp_err_to_name(temp_ret));
            }
        }
    }

    ESP_LOGI(TAG, "Producer task stopped");
    vTaskDelete(NULL);
}

/**
 * @brief Consumer task: Periodically extracts entire buffer snapshot for analysis
 */
static void consumer_task(void *pvParameters)
{
    // Local buffer to store last N samples (fast access)
    rt_process_sample_t recent_samples[CONSUMER_PROCESS_SAMPLE_COUNT];

    ESP_LOGI(TAG, "Consumer task started (circular buffer mode, processing last %d samples)",
             CONSUMER_PROCESS_SAMPLE_COUNT);

    while (s_is_running)
    {
        // Wait for processing interval
        vTaskDelay(pdMS_TO_TICKS(CONSUMER_PROCESS_INTERVAL_MS));

        if (!s_is_running)
        {
            break;
        }

        // Extract entire buffer snapshot
        if (s_circular_buffer.buffer != NULL && s_circular_buffer.mutex != NULL)
        {
            int64_t process_start_time = esp_timer_get_time();

            // Take mutex to protect buffer access
            if (xSemaphoreTake(s_circular_buffer.mutex, pdMS_TO_TICKS(50)) == pdTRUE)
            {
                // Standard circular buffer read: read from read_ptr
                uint32_t write_ptr = s_circular_buffer.write_ptr;
                uint32_t read_ptr = s_circular_buffer.read_ptr;
                uint32_t overwrite_count = s_circular_buffer.overwrite_count;

                // Calculate available samples (distance between read_ptr and write_ptr)
                uint32_t available_samples;
                if (write_ptr >= read_ptr)
                {
                    available_samples = write_ptr - read_ptr;  // Normal case
                }
                else
                {
                    available_samples = (s_circular_buffer.buffer_size - read_ptr) + write_ptr;  // Wrapped around
                }

                // Read samples from read pointer (up to CONSUMER_PROCESS_SAMPLE_COUNT)
                uint32_t samples_to_read = (available_samples > CONSUMER_PROCESS_SAMPLE_COUNT) ? 
                                           CONSUMER_PROCESS_SAMPLE_COUNT : available_samples;

                if (samples_to_read > 0)
                {
                    // Copy samples starting from read pointer
                    for (uint32_t i = 0; i < samples_to_read; i++)
                    {
                        uint32_t src_idx = (read_ptr + i) % s_circular_buffer.buffer_size;
                        recent_samples[i] = s_circular_buffer.buffer[src_idx];
                    }

                    // Update read pointer (advance read head)
                    s_circular_buffer.read_ptr = (read_ptr + samples_to_read) % s_circular_buffer.buffer_size;
                    s_circular_buffer.total_reads += samples_to_read;

                    // Fill remaining slots with zeros if not enough samples
                    for (uint32_t i = samples_to_read; i < CONSUMER_PROCESS_SAMPLE_COUNT; i++)
                    {
                        recent_samples[i].x = 0.0f;
                        recent_samples[i].y = 0.0f;
                        recent_samples[i].z = 0.0f;
                        recent_samples[i].temp = 0.0f;
                        recent_samples[i].timestamp_us = 0;
                    }
                }
                else
                {
                    // No data available, fill with zeros
                    memset(recent_samples, 0, CONSUMER_PROCESS_SAMPLE_COUNT * sizeof(rt_process_sample_t));
                }

                xSemaphoreGive(s_circular_buffer.mutex);

                // Now process the recent samples
                // This is where you can add FFT, filtering, feature extraction, etc.

                // Update processing statistics
                uint64_t process_time_us = esp_timer_get_time() - process_start_time;
                update_stats_processing(process_time_us);
                // Only count actual samples read
                uint32_t actual_samples = samples_to_read;
                s_stats.processed_samples += actual_samples;

                // Use actual_samples for processing
                uint32_t samples_to_process = actual_samples > 0 ? actual_samples : CONSUMER_PROCESS_SAMPLE_COUNT;

                // Acceleration detection based on recent samples
                if (s_enable_accel_detection)
                {
                    // Check if any of the recent samples meets conditions
                    bool condition_met = false;
                    for (uint32_t i = 0; i < samples_to_process; i++)
                    {
                        if ((fabsf(recent_samples[i].x) > ACCEL_THRESHOLD_X) ||
                            (fabsf(recent_samples[i].y) > ACCEL_THRESHOLD_Y) ||
                            (fabsf(recent_samples[i].z) < ACCEL_THRESHOLD_Z))
                        {
                            condition_met = true;
                            break;
                        }
                    }

                    // LCD control (same logic as before)
                    uint64_t current_time = esp_timer_get_time();
                    uint16_t target_color;
                    bool should_update = false;

                    if (condition_met)
                    {
                        target_color = RED;
                        if (!s_color_hold_active || s_current_lcd_color != RED)
                        {
                            s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
                            s_color_hold_active = true;
                            should_update = true;
                            ESP_LOGI(TAG, "LCD -> RED (buffer analysis)");
                        }
                        else
                        {
                            s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
                        }
                    }
                    else
                    {
                        if (s_color_hold_active && current_time < s_color_hold_until_us)
                        {
                            target_color = RED;
                        }
                        else
                        {
                            target_color = WHITE;
                            if (s_color_hold_active)
                            {
                                s_color_hold_active = false;
                                should_update = true;
                                ESP_LOGI(TAG, "LCD -> WHITE (hold expired)");
                            }
                        }
                    }

                    if (should_update && target_color != s_current_lcd_color)
                    {
                        // Check minimum interval and update in progress flag to avoid SPI bus conflict
                        uint64_t time_since_last_update = current_time - s_last_lcd_update_us;
                        if (!s_lcd_update_in_progress && time_since_last_update >= LCD_UPDATE_MIN_INTERVAL_US)
                        {
                            s_lcd_update_in_progress = true;
                            lcd_clear(target_color);
                            s_current_lcd_color = target_color;
                            s_last_lcd_update_us = current_time;
                            s_lcd_update_in_progress = false;
                        }
                        else
                        {
                            if (s_lcd_update_in_progress)
                            {
                                ESP_LOGD(TAG, "LCD update skipped: previous update still in progress");
                            }
                            else
                            {
                                ESP_LOGD(TAG, "LCD update skipped: too frequent (last update %llu us ago)",
                                         time_since_last_update);
                            }
                        }
                    }
                }

                // Output analysis results
                if (s_config.enable_serial)
                {
                    // Output recent samples processing info
                    printf("RECENT_SAMPLES: count=%u, overwrites=%lu, process_time=%llu us\n",
                           (unsigned int)CONSUMER_PROCESS_SAMPLE_COUNT,
                           (unsigned long)overwrite_count, process_time_us);
                }

                // Log periodically
                static uint32_t log_counter = 0;
                if (++log_counter >= 20)  // Log every 20 processing cycles (1 second at 50ms interval)
                {
                    ESP_LOGI(TAG, "Recent samples processed: %u samples, overwrites=%lu, process_time=%llu us",
                             (unsigned int)CONSUMER_PROCESS_SAMPLE_COUNT, (unsigned long)overwrite_count, process_time_us);
                    log_counter = 0;
                }
            }
            else
            {
                ESP_LOGW(TAG, "Failed to acquire mutex for buffer snapshot");
            }
        }
    }

    ESP_LOGI(TAG, "Consumer task stopped");
    vTaskDelete(NULL);
}

/**
 * @brief Update acquisition time statistics
 */
static void update_stats_acquisition(uint64_t time_us)
{
    // Simple moving average
    if (s_stats.total_samples == 0)
    {
        s_stats.avg_acquisition_time_us = (float)time_us;
    }
    else
    {
        s_stats.avg_acquisition_time_us = 
            (s_stats.avg_acquisition_time_us * 0.9f) + ((float)time_us * 0.1f);
    }
}

/**
 * @brief Update processing time statistics
 */
static void update_stats_processing(uint64_t time_us)
{
    // Simple moving average
    if (s_stats.processed_samples == 0)
    {
        s_stats.avg_process_time_us = (float)time_us;
    }
    else
    {
        s_stats.avg_process_time_us = 
            (s_stats.avg_process_time_us * 0.9f) + ((float)time_us * 0.1f);
    }
}

/* ============================================================================
 * PUBLIC FUNCTION IMPLEMENTATIONS
 * ============================================================================ */

esp_err_t arch_pc_init(const rt_process_config_t *config)
{
    if (s_is_initialized)
    {
        ESP_LOGW(TAG, "Architecture already initialized");
        return ESP_ERR_INVALID_STATE;
    }

    // Apply configuration
    if (config != NULL)
    {
        memcpy(&s_config, config, sizeof(rt_process_config_t));
    }
    else
    {
        // Use defaults
        s_config.sampling_frequency_hz = 100.0f;
        s_config.queue_size = RT_PC_QUEUE_SIZE_DEFAULT;
        s_config.enable_mqtt = false;  // Default: MQTT disabled
        s_config.enable_serial = true;  // Default: Serial enabled
        s_config.enable_accel_detection = false;  // Default: Detection disabled
        s_config.mqtt_topic = NULL;
    }

    // Set acceleration detection state from config
    s_enable_accel_detection = s_config.enable_accel_detection;

    // Validate configuration
    if (s_config.sampling_frequency_hz <= 0.0f || s_config.sampling_frequency_hz > 1000.0f)
    {
        ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz (valid range: 0.1 - 1000 Hz)",
                 s_config.sampling_frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    // Initialize circular buffer
    s_circular_buffer.buffer_size = CIRCULAR_BUFFER_SIZE;
    s_circular_buffer.write_ptr = 0;   // 写头初始化为0
    s_circular_buffer.read_ptr = 0;    // 读头初始化为0
    s_circular_buffer.overwrite_count = 0;
    s_circular_buffer.total_writes = 0;
    s_circular_buffer.total_reads = 0;

    // Allocate circular buffer from PSRAM (for large buffer)
    size_t required_bytes = CIRCULAR_BUFFER_SIZE * sizeof(rt_process_sample_t);
    size_t psram_free = heap_caps_get_free_size(MALLOC_CAP_SPIRAM);

    if (psram_free < required_bytes)
    {
        ESP_LOGE(TAG, "PSRAM insufficient: available %zu bytes, need %zu bytes",
                 psram_free, required_bytes);
        return ESP_ERR_NO_MEM;
    }

    s_circular_buffer.buffer = (rt_process_sample_t *)heap_caps_malloc(
        required_bytes,
        MALLOC_CAP_SPIRAM  // Use PSRAM for large buffer
    );

    if (s_circular_buffer.buffer == NULL)
    {
        ESP_LOGE(TAG, "Failed to allocate circular buffer from PSRAM");
        ESP_LOGE(TAG, "Available PSRAM: %zu bytes", heap_caps_get_free_size(MALLOC_CAP_SPIRAM));
        return ESP_ERR_NO_MEM;
    }

    // Initialize buffer to zero
    memset(s_circular_buffer.buffer, 0, required_bytes);

    // Create mutex for buffer synchronization
    s_circular_buffer.mutex = xSemaphoreCreateMutex();
    if (s_circular_buffer.mutex == NULL)
    {
        ESP_LOGE(TAG, "Failed to create mutex");
        heap_caps_free(s_circular_buffer.buffer);
        s_circular_buffer.buffer = NULL;
        return ESP_ERR_NO_MEM;
    }

    ESP_LOGI(TAG, "Circular buffer allocated: %u samples (%zu bytes) from PSRAM",
             CIRCULAR_BUFFER_SIZE, required_bytes);

    // Initialize statistics
    memset(&s_stats, 0, sizeof(rt_process_stats_t));

    s_is_initialized = true;

    ESP_LOGI(TAG, "Producer-Consumer architecture initialized (Circular Buffer Mode):");
    ESP_LOGI(TAG, "  - Sampling frequency: %.2f Hz", s_config.sampling_frequency_hz);
    ESP_LOGI(TAG, "  - Circular buffer size: %u samples", CIRCULAR_BUFFER_SIZE);
    ESP_LOGI(TAG, "  - Consumer process interval: %u ms", CONSUMER_PROCESS_INTERVAL_MS);
    ESP_LOGI(TAG, "  - MQTT enabled: %s", s_config.enable_mqtt ? "Yes" : "No");
    ESP_LOGI(TAG, "  - Serial output enabled: %s", s_config.enable_serial ? "Yes" : "No");

    return ESP_OK;
}

esp_err_t arch_pc_set_sensor_handle(adxl355_handle_t *handle)
{
    if (handle == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    s_adxl355_handle = handle;
    ESP_LOGI(TAG, "Sensor handle set");
    return ESP_OK;
}

esp_err_t arch_pc_start(void)
{
    if (!s_is_initialized)
    {
        ESP_LOGE(TAG, "Architecture not initialized");
        return ESP_ERR_INVALID_STATE;
    }

    if (s_is_running)
    {
        ESP_LOGW(TAG, "Architecture already running");
        return ESP_ERR_INVALID_STATE;
    }

    if (s_adxl355_handle == NULL)
    {
        ESP_LOGE(TAG, "Sensor handle not set");
        return ESP_ERR_INVALID_STATE;
    }

    // Calculate timer period
    uint64_t period_us = (uint64_t)(1000000.0f / s_config.sampling_frequency_hz);
    if (period_us < 100)
    {
        ESP_LOGE(TAG, "Sampling frequency too high: %.2f Hz", s_config.sampling_frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    // Create sampling timer
    esp_timer_create_args_t timer_args = {
        .callback = sampling_timer_callback,
        .arg = NULL,
        .name = "rt_pc_sampling_timer"
    };

    esp_err_t ret = esp_timer_create(&timer_args, &s_sampling_timer);
    if (ret != ESP_OK)
    {
        ESP_LOGE(TAG, "Failed to create timer: %s", esp_err_to_name(ret));
        return ret;
    }

    // Set running flag
    s_is_running = true;

    // Create producer task (high priority)
    BaseType_t task_ret = xTaskCreate(
        producer_task,
        "rt_pc_producer",
        RT_PC_PRODUCER_STACK_SIZE,
        NULL,
        RT_PC_PRODUCER_PRIORITY,
        &s_producer_task_handle
    );

    if (task_ret != pdPASS)
    {
        ESP_LOGE(TAG, "Failed to create producer task");
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ESP_ERR_NO_MEM;
    }

    // Create consumer task (medium priority)
    task_ret = xTaskCreate(
        consumer_task,
        "rt_pc_consumer",
        RT_PC_CONSUMER_STACK_SIZE,
        NULL,
        RT_PC_CONSUMER_PRIORITY,
        &s_consumer_task_handle
    );

    if (task_ret != pdPASS)
    {
        ESP_LOGE(TAG, "Failed to create consumer task");
        vTaskDelete(s_producer_task_handle);
        s_producer_task_handle = NULL;
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ESP_ERR_NO_MEM;
    }

    // Perform immediate first sample
    sampling_timer_callback(NULL);

    // Start periodic timer
    ret = esp_timer_start_periodic(s_sampling_timer, period_us);
    if (ret != ESP_OK)
    {
        ESP_LOGE(TAG, "Failed to start timer: %s", esp_err_to_name(ret));
        vTaskDelete(s_producer_task_handle);
        vTaskDelete(s_consumer_task_handle);
        s_producer_task_handle = NULL;
        s_consumer_task_handle = NULL;
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ret;
    }

    ESP_LOGI(TAG, "Producer-Consumer architecture started (frequency: %.2f Hz, period: %llu us)",
             s_config.sampling_frequency_hz, period_us);

    return ESP_OK;
}

esp_err_t arch_pc_stop(void)
{
    if (!s_is_running)
    {
        ESP_LOGW(TAG, "Architecture not running");
        return ESP_ERR_INVALID_STATE;
    }

    s_is_running = false;

    // Stop timer
    if (s_sampling_timer != NULL)
    {
        esp_timer_stop(s_sampling_timer);
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
    }

    // Tasks will exit when s_is_running becomes false
    // Wait a bit for tasks to finish
    vTaskDelay(pdMS_TO_TICKS(100));

    // Force delete tasks if still running
    if (s_producer_task_handle != NULL)
    {
        vTaskDelete(s_producer_task_handle);
        s_producer_task_handle = NULL;
    }

    if (s_consumer_task_handle != NULL)
    {
        vTaskDelete(s_consumer_task_handle);
        s_consumer_task_handle = NULL;
    }

    ESP_LOGI(TAG, "Producer-Consumer architecture stopped");
    return ESP_OK;
}

esp_err_t arch_pc_deinit(void)
{
    if (s_is_running)
    {
        arch_pc_stop();
    }

    // Delete mutex
    if (s_circular_buffer.mutex != NULL)
    {
        vSemaphoreDelete(s_circular_buffer.mutex);
        s_circular_buffer.mutex = NULL;
    }

    // Free circular buffer (PSRAM)
    if (s_circular_buffer.buffer != NULL)
    {
        heap_caps_free(s_circular_buffer.buffer);
        s_circular_buffer.buffer = NULL;
    }

    s_circular_buffer.buffer_size = 0;
    s_circular_buffer.write_ptr = 0;
    s_circular_buffer.read_ptr = 0;
    s_circular_buffer.overwrite_count = 0;
    s_circular_buffer.total_writes = 0;
    s_circular_buffer.total_reads = 0;

    s_is_initialized = false;
    s_adxl355_handle = NULL;
    memset(&s_stats, 0, sizeof(rt_process_stats_t));

    ESP_LOGI(TAG, "Producer-Consumer architecture deinitialized");
    return ESP_OK;
}

esp_err_t arch_pc_get_status(rt_process_status_t *status)
{
    if (status == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    status->is_running = s_is_running;
    status->arch_type = RT_ARCH_PRODUCER_CONSUMER;
    status->sampling_frequency_hz = s_config.sampling_frequency_hz;

    // For circular buffer, queue_usage represents buffer fill status
    if (s_circular_buffer.buffer != NULL)
    {
        // Calculate buffer usage based on write_ptr and read_ptr
        uint32_t write_ptr = s_circular_buffer.write_ptr;
        uint32_t read_ptr = s_circular_buffer.read_ptr;

        if (write_ptr >= read_ptr)
        {
            status->queue_usage = write_ptr - read_ptr;  // Normal case
        }
        else
        {
            status->queue_usage = (s_circular_buffer.buffer_size - read_ptr) + write_ptr;  // Wrapped around
        }
    }
    else
    {
        status->queue_usage = 0;
    }

    status->buffer_usage = status->queue_usage;  // Same as queue_usage for circular buffer

    return ESP_OK;
}

esp_err_t arch_pc_get_stats(rt_process_stats_t *stats)
{
    if (stats == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    memcpy(stats, &s_stats, sizeof(rt_process_stats_t));
    return ESP_OK;
}

esp_err_t arch_pc_set_frequency(float frequency_hz)
{
    if (frequency_hz <= 0.0f || frequency_hz > 1000.0f)
    {
        ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz", frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    bool was_running = s_is_running;
    if (was_running)
    {
        arch_pc_stop();
    }

    s_config.sampling_frequency_hz = frequency_hz;
    ESP_LOGI(TAG, "Sampling frequency updated to %.2f Hz", frequency_hz);

    if (was_running)
    {
        return arch_pc_start();
    }

    return ESP_OK;
}

esp_err_t arch_pc_is_running(bool *is_running)
{
    if (is_running == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    *is_running = s_is_running;
    return ESP_OK;
}

/**
 * @brief Enable or disable acceleration detection with LCD feedback
 * @param enable true to enable detection, false to disable
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_pc_set_accel_detection(bool enable)
{
    s_enable_accel_detection = enable;

    if (enable)
    {
        // Initialize LCD to white (normal state)
        lcd_clear(WHITE);
        s_current_lcd_color = WHITE;
        s_color_hold_active = false;
        s_color_hold_until_us = 0;
        ESP_LOGI(TAG, "Acceleration detection enabled with LCD feedback (0.3s persistence)");
    }
    else
    {
        // Reset LCD to white when disabled
        lcd_clear(WHITE);
        s_current_lcd_color = WHITE;
        s_color_hold_active = false;
        s_color_hold_until_us = 0;
        ESP_LOGI(TAG, "Acceleration detection disabled");
    }

    return ESP_OK;
}

关键实现

定时器回调

定时器回调使用 xTaskNotify 通知 Producer 任务,避免在定时器上下文中进行阻塞操作:

static void sampling_timer_callback(void *arg)
{
    if (!s_is_running || s_adxl355_handle == NULL)
    {
        return;
    }

    // 通知 Producer 任务读取传感器
    if (s_producer_task_handle != NULL)
    {
        xTaskNotify(s_producer_task_handle, 1, eSetBits);
    }
}

Producer 任务

Producer 任务等待定时器通知,读取传感器数据,并写入环形缓冲区:

  • 非阻塞:使用 xTaskNotifyWait 等待定时器通知
  • 互斥锁保护:以 10ms 超时获取互斥锁以保护缓冲区访问
  • FIFO 覆盖:缓冲区满时,推进读指针以覆盖最旧数据
  • 统计:更新采集时间和总样本计数

Consumer 任务

Consumer 任务定期处理环形缓冲区:

  • 固定间隔:每 50ms 处理一次
  • 样本提取:读取最多 10 个最新样本
  • 处理:执行特征提取、检测和输出
  • 统计:更新处理时间和已处理样本计数

环形缓冲区管理

环形缓冲区使用互斥锁保护的写/读指针机制:

  • 写指针:Producer 写入后推进
  • 读指针:Consumer 读取后推进
  • 环绕:两个指针在到达缓冲区末尾时环绕
  • 覆盖检测:跟踪写指针追上读指针时的情况

使用示例

#include "real-time-process-arch.h"

// 选择架构1(默认)
// #define RT_PROCESS_ARCH_TYPE RT_ARCH_PRODUCER_CONSUMER
#include "real-time-process-arch.h"

// 初始化
rt_process_config_t config = {
    .sampling_frequency_hz = 100.0f,
    .queue_size = 50,
    .enable_mqtt = false,
    .enable_serial = true,
    .enable_accel_detection = true,
    .mqtt_topic = NULL
};

esp_err_t ret = rt_process_init(&config);
rt_process_set_sensor_handle(adxl355_handle);
rt_process_start();

// 监控
rt_process_stats_t stats;
rt_process_get_stats(&stats);
ESP_LOGI("App", "总数: %lu, 已处理: %lu, 丢弃: %lu",
         stats.total_samples, stats.processed_samples, stats.dropped_samples);

// 停止
rt_process_stop();
rt_process_deinit();