Skip to content

CODE

Warning

The following code should be based on the code in the release code, which may have been updated.

arch_dma_dual_core.h

/**
 * @file arch_dma_dual_core.h
 * @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
 * @brief Architecture 3: DMA + Dual Core Division of Labor
 * @version 1.0
 * @date 2025-01-17
 * @copyright Copyright (c) 2025
 *
 * @details
 * This architecture implements dual-core division of labor:
 * - Core 0: Producer task (data acquisition with DMA)
 * - Core 1: Consumer task (data processing)
 * - Double buffer mechanism for parallel operation
 * - DMA for SPI transfers to reduce CPU load
 */

#pragma once

#include "arch_common.h"
#include "node_acc_adxl355.h"
#include "esp_err.h"

#ifdef __cplusplus
extern "C"
{
#endif

/* ============================================================================
 * CONFIGURATION
 * ============================================================================ */

/**
 * @brief Buffer size for DMA dual core buffering (number of samples per buffer)
 * @note Uses TINY_MEASUREMENT_RT_DMA_DC_BUFFER_SIZE from tiny_measurement_config.h
 */
#ifndef RT_DMA_DC_BUFFER_SIZE
#define RT_DMA_DC_BUFFER_SIZE TINY_MEASUREMENT_RT_DMA_DC_BUFFER_SIZE
#endif

/**
 * @brief Producer task priority (high priority for timely acquisition)
 * @note Runs on Core 0
 * @note Uses TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_PRIORITY from tiny_measurement_config.h
 */
#ifndef RT_DMA_DC_PRODUCER_PRIORITY
#define RT_DMA_DC_PRODUCER_PRIORITY TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_PRIORITY
#endif

/**
 * @brief Consumer task priority (high priority for processing)
 * @note Runs on Core 1
 * @note Uses TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_PRIORITY from tiny_measurement_config.h
 */
#ifndef RT_DMA_DC_CONSUMER_PRIORITY
#define RT_DMA_DC_CONSUMER_PRIORITY TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_PRIORITY
#endif

/**
 * @brief Producer task stack size
 * @note Uses TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_STACK_SIZE from tiny_measurement_config.h
 */
#ifndef RT_DMA_DC_PRODUCER_STACK_SIZE
#define RT_DMA_DC_PRODUCER_STACK_SIZE TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_STACK_SIZE
#endif

/**
 * @brief Consumer task stack size
 * @note Uses TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_STACK_SIZE from tiny_measurement_config.h
 */
#ifndef RT_DMA_DC_CONSUMER_STACK_SIZE
#define RT_DMA_DC_CONSUMER_STACK_SIZE TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_STACK_SIZE
#endif

/**
 * @brief Core assignment for producer task
 * @note Uses TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_CORE from tiny_measurement_config.h
 */
#ifndef RT_DMA_DC_PRODUCER_CORE
#define RT_DMA_DC_PRODUCER_CORE TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_CORE
#endif

/**
 * @brief Core assignment for consumer task
 * @note Uses TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_CORE from tiny_measurement_config.h
 */
#ifndef RT_DMA_DC_CONSUMER_CORE
#define RT_DMA_DC_CONSUMER_CORE TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_CORE
#endif

/* ============================================================================
 * FUNCTION DECLARATIONS
 * ============================================================================ */

/**
 * @brief Initialize DMA + Dual Core architecture
 * @param config Configuration structure
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_init(const rt_process_config_t *config);

/**
 * @brief Set sensor handle for DMA + Dual Core architecture
 * @param handle ADXL355 sensor handle
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_set_sensor_handle(adxl355_handle_t *handle);

/**
 * @brief Start DMA + Dual Core architecture
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_start(void);

/**
 * @brief Stop DMA + Dual Core architecture
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_stop(void);

/**
 * @brief Deinitialize DMA + Dual Core architecture
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_deinit(void);

/**
 * @brief Get status of DMA + Dual Core architecture
 * @param status Output parameter for status
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_get_status(rt_process_status_t *status);

/**
 * @brief Get statistics of DMA + Dual Core architecture
 * @param stats Output parameter for statistics
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_get_stats(rt_process_stats_t *stats);

/**
 * @brief Set sampling frequency for DMA + Dual Core architecture
 * @param frequency_hz New sampling frequency in Hz
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_set_frequency(float frequency_hz);

/**
 * @brief Check if DMA + Dual Core architecture is running
 * @param is_running Output parameter: true if running
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_is_running(bool *is_running);

/**
 * @brief Enable or disable acceleration detection with LCD feedback
 * @param enable true to enable detection, false to disable
 * @return ESP_OK on success, error code on failure
 */
esp_err_t arch_dma_dc_set_accel_detection(bool enable);

#ifdef __cplusplus
}
#endif

arch_dma_dual_core.c

/**
 * @file arch_dma_dual_core.c
 * @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
 * @brief Architecture 3: DMA + Dual Core Division of Labor Implementation
 * @version 1.0
 * @date 2025-01-17
 * @copyright Copyright (c) 2025
 *
 * @details
 * Dual-core architecture:
 * - Core 0: Producer task (data acquisition with DMA)
 * - Core 1: Consumer task (data processing)
 * - Double buffer mechanism for parallel operation
 */

#include "arch_dma_dual_core.h"
#include "node_mqtt.h"  // Provides s_mqtt_client, s_is_mqtt_connected, MQTT_PUBLISH_TOPIC
#include "node_lcd.h"   // LCD control for acceleration detection
#include "esp_log.h"
#include "esp_timer.h"
#include "esp_heap_caps.h"  // For PSRAM allocation
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include <string.h>
#include <stdio.h>
#include <math.h>

/* ============================================================================
 * PRIVATE DEFINITIONS
 * ============================================================================ */

static const char *TAG = "RTArch_DMA_DC";

/* ============================================================================
 * PRIVATE VARIABLES
 * ============================================================================ */

// Configuration
static rt_process_config_t s_config = {
    .sampling_frequency_hz = 100.0f,
    .queue_size = RT_DMA_DC_BUFFER_SIZE,
    .enable_mqtt = false,
    .enable_serial = true,
    .enable_accel_detection = false,
    .mqtt_topic = NULL
};

// Double buffer structure (same as DMA double buffer, but with core pinning)
typedef struct {
    rt_process_sample_t *buffer_a;        // Buffer A (PSRAM)
    rt_process_sample_t *buffer_b;        // Buffer B (PSRAM)
    uint32_t buffer_size;                 // Size of each buffer
    volatile uint32_t buffer_a_write_idx; // Write index for buffer A
    volatile uint32_t buffer_b_write_idx; // Write index for buffer B
    volatile bool buffer_a_ready;         // Buffer A is ready for processing
    volatile bool buffer_b_ready;         // Buffer B is ready for processing
    volatile uint32_t active_buffer;      // 0 = buffer A, 1 = buffer B (for producer)
    SemaphoreHandle_t buffer_a_mutex;     // Mutex for buffer A
    SemaphoreHandle_t buffer_b_mutex;     // Mutex for buffer B
    SemaphoreHandle_t buffer_a_sem;       // Semaphore: buffer A ready for consumer
    SemaphoreHandle_t buffer_b_sem;       // Semaphore: buffer B ready for consumer
    uint32_t overwrite_count;             // Count of buffer overwrites
    uint32_t total_writes;                 // Total number of writes
} rt_dc_double_buffer_t;

static rt_dc_double_buffer_t s_double_buffer = {0};

// Task and timer handles
static TaskHandle_t s_producer_task_handle = NULL;  // Core 0
static TaskHandle_t s_consumer_task_handle = NULL;  // Core 1
static esp_timer_handle_t s_sampling_timer = NULL;
static bool s_is_running = false;
static bool s_is_initialized = false;
static adxl355_handle_t *s_adxl355_handle = NULL;

// Statistics
static rt_process_stats_t s_stats = {0};

// Acceleration detection state
static bool s_enable_accel_detection = false;
static uint16_t s_current_lcd_color = 0xFFFF; // Current LCD color (WHITE)
static uint64_t s_last_lcd_update_us = 0;
static uint64_t s_color_hold_until_us = 0;
static bool s_color_hold_active = false;
static bool s_lcd_update_in_progress = false;

// LCD color hold duration: 0.3 seconds
#define LCD_COLOR_HOLD_DURATION_US 300000U  // 0.3 seconds (300ms) in microseconds
// LCD update minimum interval to avoid SPI bus conflict
#define LCD_UPDATE_MIN_INTERVAL_US 100000U  // Minimum 100ms between LCD updates

// Detection thresholds
#define ACCEL_THRESHOLD_X 0.5f  // Condition 1: |x| > 0.5g
#define ACCEL_THRESHOLD_Y 0.5f  // Condition 2: |y| > 0.5g
#define ACCEL_THRESHOLD_Z 0.5f  // Condition 3: |z| < 0.5g

/* ============================================================================
 * PRIVATE FUNCTION DECLARATIONS
 * ============================================================================ */

static void sampling_timer_callback(void *arg);
static void producer_task(void *pvParameters);  // Runs on Core 0
static void consumer_task(void *pvParameters);  // Runs on Core 1
static void update_stats_acquisition(uint64_t time_us);
static void update_stats_processing(uint64_t time_us);

/* ============================================================================
 * PRIVATE FUNCTION IMPLEMENTATIONS
 * ============================================================================ */

/**
 * @brief Timer callback for periodic sampling (triggers producer task on Core 0)
 */
static void sampling_timer_callback(void *arg)
{
    if (!s_is_running || s_adxl355_handle == NULL)
    {
        return;
    }

    // Notify producer task (Core 0) to read sensor
    if (s_producer_task_handle != NULL)
    {
        xTaskNotify(s_producer_task_handle, 1, eSetBits);
    }
}

/**
 * @brief Update acquisition time statistics
 */
static void update_stats_acquisition(uint64_t time_us)
{
    // Simple moving average
    if (s_stats.total_samples == 0)
    {
        s_stats.avg_acquisition_time_us = (float)time_us;
    }
    else
    {
        s_stats.avg_acquisition_time_us = 
            (s_stats.avg_acquisition_time_us * 0.9f) + ((float)time_us * 0.1f);
    }
}

/**
 * @brief Update processing time statistics
 */
static void update_stats_processing(uint64_t time_us)
{
    // Simple moving average
    if (s_stats.processed_samples == 0)
    {
        s_stats.avg_process_time_us = (float)time_us;
    }
    else
    {
        s_stats.avg_process_time_us = 
            (s_stats.avg_process_time_us * 0.9f) + ((float)time_us * 0.1f);
    }
}

/**
 * @brief Producer task: Reads sensor data and fills double buffers (Core 0)
 * @note This task is pinned to Core 0 for dedicated data acquisition
 */
static void producer_task(void *pvParameters)
{
    rt_process_sample_t sample;
    uint32_t notification_value;
    int64_t start_time;

    ESP_LOGI(TAG, "Producer task started on Core %d (DMA + Dual Core mode)", 
             xPortGetCoreID());

    while (s_is_running)
    {
        // Wait for timer notification
        if (xTaskNotifyWait(0, ULONG_MAX, &notification_value, portMAX_DELAY) == pdTRUE)
        {
            if (!s_is_running || s_adxl355_handle == NULL)
            {
                break;
            }

            start_time = esp_timer_get_time();

            // Read sensor data (DMA will be used automatically by SPI driver)
            adxl355_accelerations_t accel;
            float temperature;
            esp_err_t accel_ret = adxl355_read_accelerations(s_adxl355_handle, &accel);
            esp_err_t temp_ret = adxl355_read_temperature(s_adxl355_handle, &temperature);

            if (accel_ret == ESP_OK && temp_ret == ESP_OK)
            {
                // Prepare sample
                sample.x = accel.x;
                sample.y = accel.y;
                sample.z = accel.z;
                sample.temp = temperature;
                sample.timestamp_us = esp_timer_get_time();

                // Update acquisition statistics
                update_stats_acquisition(esp_timer_get_time() - start_time);

                // Write to active buffer (A or B)
                uint32_t active = s_double_buffer.active_buffer;
                SemaphoreHandle_t mutex = (active == 0) ? s_double_buffer.buffer_a_mutex : s_double_buffer.buffer_b_mutex;
                rt_process_sample_t *target_buffer = (active == 0) ? s_double_buffer.buffer_a : s_double_buffer.buffer_b;
                volatile uint32_t *write_idx = (active == 0) ? &s_double_buffer.buffer_a_write_idx : &s_double_buffer.buffer_b_write_idx;

                if (xSemaphoreTake(mutex, pdMS_TO_TICKS(10)) == pdTRUE)
                {
                    // Write sample to current buffer
                    target_buffer[*write_idx] = sample;
                    (*write_idx)++;
                    s_double_buffer.total_writes++;

                    // Check if buffer is full
                    if (*write_idx >= s_double_buffer.buffer_size)
                    {
                        // Buffer full: mark as ready and switch to other buffer
                        if (active == 0)
                        {
                            s_double_buffer.buffer_a_ready = true;
                            s_double_buffer.active_buffer = 1;  // Switch to buffer B
                            s_double_buffer.buffer_a_write_idx = 0;  // Reset write index
                            xSemaphoreGive(s_double_buffer.buffer_a_sem);  // Notify consumer (Core 1)
                        }
                        else
                        {
                            s_double_buffer.buffer_b_ready = true;
                            s_double_buffer.active_buffer = 0;  // Switch to buffer A
                            s_double_buffer.buffer_b_write_idx = 0;  // Reset write index
                            xSemaphoreGive(s_double_buffer.buffer_b_sem);  // Notify consumer (Core 1)
                        }

                        // Check for overwrite (if consumer didn't process in time)
                        if ((active == 0 && s_double_buffer.buffer_a_ready) ||
                            (active == 1 && s_double_buffer.buffer_b_ready))
                        {
                            s_double_buffer.overwrite_count++;
                            ESP_LOGW(TAG, "Buffer overwrite detected (buffer %s)", active == 0 ? "A" : "B");
                        }
                    }

                    xSemaphoreGive(mutex);
                    s_stats.total_samples++;
                }
                else
                {
                    s_stats.dropped_samples++;
                    ESP_LOGW(TAG, "Mutex timeout, sample dropped");
                }
            }
            else
            {
                ESP_LOGE(TAG, "Failed to read sensor: accel=%s, temp=%s",
                         esp_err_to_name(accel_ret), esp_err_to_name(temp_ret));
            }
        }
    }

    ESP_LOGI(TAG, "Producer task stopped (Core %d)", xPortGetCoreID());
    vTaskDelete(NULL);
}

/**
 * @brief Consumer task: Processes filled buffers from double buffer (Core 1)
 * @note This task is pinned to Core 1 for dedicated data processing
 */
static void consumer_task(void *pvParameters)
{
    ESP_LOGI(TAG, "Consumer task started on Core %d (DMA + Dual Core mode)", 
             xPortGetCoreID());

    while (s_is_running)
    {
        // Wait for either buffer A or buffer B to be ready
        uint32_t buffer_to_process = 0;  // 0 = buffer A, 1 = buffer B
        bool buffer_ready = false;

        // Wait for buffer A or buffer B to be ready (with timeout)
        if (xSemaphoreTake(s_double_buffer.buffer_a_sem, pdMS_TO_TICKS(100)) == pdTRUE)
        {
            buffer_to_process = 0;
            buffer_ready = true;
        }
        else if (xSemaphoreTake(s_double_buffer.buffer_b_sem, pdMS_TO_TICKS(100)) == pdTRUE)
        {
            buffer_to_process = 1;
            buffer_ready = true;
        }

        if (!s_is_running)
        {
            break;
        }

        if (buffer_ready)
        {
            int64_t process_start_time = esp_timer_get_time();

            // Get buffer and mutex for the buffer to process
            rt_process_sample_t *process_buffer = (buffer_to_process == 0) ? 
                                                   s_double_buffer.buffer_a : s_double_buffer.buffer_b;
            SemaphoreHandle_t mutex = (buffer_to_process == 0) ? 
                                      s_double_buffer.buffer_a_mutex : s_double_buffer.buffer_b_mutex;
            uint32_t buffer_size = s_double_buffer.buffer_size;

            // Process the buffer
            if (xSemaphoreTake(mutex, pdMS_TO_TICKS(100)) == pdTRUE)
            {
                // Process all samples in the buffer
                for (uint32_t i = 0; i < buffer_size; i++)
                {
                    rt_process_sample_t *sample = &process_buffer[i];

                    // Acceleration detection
                    if (s_enable_accel_detection)
                    {
                        bool condition_met = (fabsf(sample->x) > ACCEL_THRESHOLD_X) ||
                                             (fabsf(sample->y) > ACCEL_THRESHOLD_Y) ||
                                             (fabsf(sample->z) < ACCEL_THRESHOLD_Z);

                        uint64_t current_time = esp_timer_get_time();
                        uint16_t target_color;
                        bool should_update = false;

                        if (condition_met)
                        {
                            target_color = RED;
                            if (!s_color_hold_active || s_current_lcd_color != RED)
                            {
                                s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
                                s_color_hold_active = true;
                                should_update = true;
                                ESP_LOGI(TAG, "LCD -> RED (buffer %s, Core %d)", 
                                         buffer_to_process == 0 ? "A" : "B", xPortGetCoreID());
                            }
                            else
                            {
                                s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
                            }
                        }
                        else
                        {
                            if (s_color_hold_active && current_time < s_color_hold_until_us)
                            {
                                target_color = RED;
                            }
                            else
                            {
                                target_color = WHITE;
                                if (s_color_hold_active)
                                {
                                    s_color_hold_active = false;
                                    should_update = true;
                                    ESP_LOGI(TAG, "LCD -> WHITE (hold expired, Core %d)", xPortGetCoreID());
                                }
                            }
                        }

                        if (should_update && target_color != s_current_lcd_color)
                        {
                            uint64_t time_since_last_update = current_time - s_last_lcd_update_us;
                            if (!s_lcd_update_in_progress && time_since_last_update >= LCD_UPDATE_MIN_INTERVAL_US)
                            {
                                s_lcd_update_in_progress = true;
                                lcd_clear(target_color);
                                s_current_lcd_color = target_color;
                                s_last_lcd_update_us = current_time;
                                s_lcd_update_in_progress = false;
                            }
                        }
                    }

                    // Serial output (if enabled)
                    if (s_config.enable_serial && i == 0)  // Output first sample only
                    {
                        printf("BUFFER_%s_CORE1: x=%.3f,y=%.3f,z=%.3f,temp=%.1f\n",
                               buffer_to_process == 0 ? "A" : "B",
                               sample->x, sample->y, sample->z, sample->temp);
                    }
                }

                // Mark buffer as processed and reset ready flag
                if (buffer_to_process == 0)
                {
                    s_double_buffer.buffer_a_ready = false;
                }
                else
                {
                    s_double_buffer.buffer_b_ready = false;
                }

                xSemaphoreGive(mutex);

                // Update processing statistics
                uint64_t process_time_us = esp_timer_get_time() - process_start_time;
                update_stats_processing(process_time_us);
                s_stats.processed_samples += buffer_size;

                // Log periodically
                static uint32_t log_counter = 0;
                if (++log_counter >= 10)
                {
                    ESP_LOGI(TAG, "Buffer %s processed on Core %d: %u samples, process_time=%llu us",
                             buffer_to_process == 0 ? "A" : "B", xPortGetCoreID(), buffer_size, process_time_us);
                    log_counter = 0;
                }
            }
            else
            {
                ESP_LOGW(TAG, "Failed to acquire mutex for buffer %s", buffer_to_process == 0 ? "A" : "B");
            }
        }
    }

    ESP_LOGI(TAG, "Consumer task stopped (Core %d)", xPortGetCoreID());
    vTaskDelete(NULL);
}

/* ============================================================================
 * PUBLIC FUNCTION IMPLEMENTATIONS
 * ============================================================================ */

esp_err_t arch_dma_dc_init(const rt_process_config_t *config)
{
    if (s_is_initialized)
    {
        ESP_LOGW(TAG, "Architecture already initialized");
        return ESP_ERR_INVALID_STATE;
    }

    // Apply configuration
    if (config != NULL)
    {
        memcpy(&s_config, config, sizeof(rt_process_config_t));
    }
    else
    {
        // Use defaults
        s_config.sampling_frequency_hz = 100.0f;
        s_config.queue_size = RT_DMA_DC_BUFFER_SIZE;
        s_config.enable_mqtt = false;
        s_config.enable_serial = true;
        s_config.enable_accel_detection = false;
        s_config.mqtt_topic = NULL;
    }

    // Set acceleration detection state from config
    s_enable_accel_detection = s_config.enable_accel_detection;

    // Validate configuration
    if (s_config.sampling_frequency_hz <= 0.0f || s_config.sampling_frequency_hz > 10000.0f)
    {
        ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz (valid range: 0.1 - 10000 Hz)",
                 s_config.sampling_frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    // Initialize double buffer
    s_double_buffer.buffer_size = RT_DMA_DC_BUFFER_SIZE;
    s_double_buffer.buffer_a_write_idx = 0;
    s_double_buffer.buffer_b_write_idx = 0;
    s_double_buffer.buffer_a_ready = false;
    s_double_buffer.buffer_b_ready = false;
    s_double_buffer.active_buffer = 0;  // Start with buffer A
    s_double_buffer.overwrite_count = 0;
    s_double_buffer.total_writes = 0;

    // Allocate buffers from PSRAM
    size_t required_bytes = RT_DMA_DC_BUFFER_SIZE * sizeof(rt_process_sample_t);
    size_t psram_free = heap_caps_get_free_size(MALLOC_CAP_SPIRAM);

    if (psram_free < required_bytes * 2)
    {
        ESP_LOGE(TAG, "PSRAM insufficient: available %zu bytes, need %zu bytes",
                 psram_free, required_bytes * 2);
        return ESP_ERR_NO_MEM;
    }

    // Allocate buffer A
    s_double_buffer.buffer_a = (rt_process_sample_t *)heap_caps_malloc(
        required_bytes,
        MALLOC_CAP_SPIRAM
    );
    if (s_double_buffer.buffer_a == NULL)
    {
        ESP_LOGE(TAG, "Failed to allocate buffer A from PSRAM");
        return ESP_ERR_NO_MEM;
    }
    memset(s_double_buffer.buffer_a, 0, required_bytes);

    // Allocate buffer B
    s_double_buffer.buffer_b = (rt_process_sample_t *)heap_caps_malloc(
        required_bytes,
        MALLOC_CAP_SPIRAM
    );
    if (s_double_buffer.buffer_b == NULL)
    {
        ESP_LOGE(TAG, "Failed to allocate buffer B from PSRAM");
        heap_caps_free(s_double_buffer.buffer_a);
        s_double_buffer.buffer_a = NULL;
        return ESP_ERR_NO_MEM;
    }
    memset(s_double_buffer.buffer_b, 0, required_bytes);

    // Create mutexes
    s_double_buffer.buffer_a_mutex = xSemaphoreCreateMutex();
    s_double_buffer.buffer_b_mutex = xSemaphoreCreateMutex();
    if (s_double_buffer.buffer_a_mutex == NULL || s_double_buffer.buffer_b_mutex == NULL)
    {
        ESP_LOGE(TAG, "Failed to create mutexes");
        if (s_double_buffer.buffer_a != NULL) heap_caps_free(s_double_buffer.buffer_a);
        if (s_double_buffer.buffer_b != NULL) heap_caps_free(s_double_buffer.buffer_b);
        if (s_double_buffer.buffer_a_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_a_mutex);
        if (s_double_buffer.buffer_b_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_b_mutex);
        return ESP_ERR_NO_MEM;
    }

    // Create semaphores (binary semaphores for buffer ready signals)
    s_double_buffer.buffer_a_sem = xSemaphoreCreateBinary();
    s_double_buffer.buffer_b_sem = xSemaphoreCreateBinary();
    if (s_double_buffer.buffer_a_sem == NULL || s_double_buffer.buffer_b_sem == NULL)
    {
        ESP_LOGE(TAG, "Failed to create semaphores");
        if (s_double_buffer.buffer_a != NULL) heap_caps_free(s_double_buffer.buffer_a);
        if (s_double_buffer.buffer_b != NULL) heap_caps_free(s_double_buffer.buffer_b);
        if (s_double_buffer.buffer_a_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_a_mutex);
        if (s_double_buffer.buffer_b_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_b_mutex);
        return ESP_ERR_NO_MEM;
    }

    // Initialize statistics
    memset(&s_stats, 0, sizeof(rt_process_stats_t));

    s_is_initialized = true;

    ESP_LOGI(TAG, "DMA + Dual Core architecture initialized:");
    ESP_LOGI(TAG, "  - Sampling frequency: %.2f Hz", s_config.sampling_frequency_hz);
    ESP_LOGI(TAG, "  - Buffer size: %u samples per buffer", RT_DMA_DC_BUFFER_SIZE);
    ESP_LOGI(TAG, "  - Total buffer memory: %zu bytes (PSRAM)", required_bytes * 2);
    ESP_LOGI(TAG, "  - Producer task: Core %d", RT_DMA_DC_PRODUCER_CORE);
    ESP_LOGI(TAG, "  - Consumer task: Core %d", RT_DMA_DC_CONSUMER_CORE);
    ESP_LOGI(TAG, "  - MQTT enabled: %s", s_config.enable_mqtt ? "Yes" : "No");
    ESP_LOGI(TAG, "  - Serial output enabled: %s", s_config.enable_serial ? "Yes" : "No");

    return ESP_OK;
}

esp_err_t arch_dma_dc_set_sensor_handle(adxl355_handle_t *handle)
{
    if (handle == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    s_adxl355_handle = handle;
    ESP_LOGI(TAG, "Sensor handle set");
    return ESP_OK;
}

esp_err_t arch_dma_dc_start(void)
{
    if (!s_is_initialized)
    {
        ESP_LOGE(TAG, "Architecture not initialized");
        return ESP_ERR_INVALID_STATE;
    }

    if (s_is_running)
    {
        ESP_LOGW(TAG, "Architecture already running");
        return ESP_ERR_INVALID_STATE;
    }

    if (s_adxl355_handle == NULL)
    {
        ESP_LOGE(TAG, "Sensor handle not set");
        return ESP_ERR_INVALID_STATE;
    }

    // Calculate timer period
    uint64_t period_us = (uint64_t)(1000000.0f / s_config.sampling_frequency_hz);
    if (period_us < 100)
    {
        ESP_LOGE(TAG, "Sampling frequency too high: %.2f Hz", s_config.sampling_frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    // Create sampling timer
    esp_timer_create_args_t timer_args = {
        .callback = sampling_timer_callback,
        .arg = NULL,
        .name = "rt_dma_dc_sampling_timer"
    };

    esp_err_t ret = esp_timer_create(&timer_args, &s_sampling_timer);
    if (ret != ESP_OK)
    {
        ESP_LOGE(TAG, "Failed to create timer: %s", esp_err_to_name(ret));
        return ret;
    }

    // Set running flag
    s_is_running = true;

    // Create producer task on Core 0 (high priority)
    BaseType_t task_ret = xTaskCreatePinnedToCore(
        producer_task,
        "rt_dma_dc_producer",
        RT_DMA_DC_PRODUCER_STACK_SIZE,
        NULL,
        RT_DMA_DC_PRODUCER_PRIORITY,
        &s_producer_task_handle,
        RT_DMA_DC_PRODUCER_CORE  // Pin to Core 0
    );

    if (task_ret != pdPASS)
    {
        ESP_LOGE(TAG, "Failed to create producer task on Core %d", RT_DMA_DC_PRODUCER_CORE);
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ESP_ERR_NO_MEM;
    }

    // Create consumer task on Core 1 (high priority)
    task_ret = xTaskCreatePinnedToCore(
        consumer_task,
        "rt_dma_dc_consumer",
        RT_DMA_DC_CONSUMER_STACK_SIZE,
        NULL,
        RT_DMA_DC_CONSUMER_PRIORITY,
        &s_consumer_task_handle,
        RT_DMA_DC_CONSUMER_CORE  // Pin to Core 1
    );

    if (task_ret != pdPASS)
    {
        ESP_LOGE(TAG, "Failed to create consumer task on Core %d", RT_DMA_DC_CONSUMER_CORE);
        vTaskDelete(s_producer_task_handle);
        s_producer_task_handle = NULL;
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ESP_ERR_NO_MEM;
    }

    // Perform immediate first sample
    sampling_timer_callback(NULL);

    // Start periodic timer
    ret = esp_timer_start_periodic(s_sampling_timer, period_us);
    if (ret != ESP_OK)
    {
        ESP_LOGE(TAG, "Failed to start timer: %s", esp_err_to_name(ret));
        vTaskDelete(s_producer_task_handle);
        vTaskDelete(s_consumer_task_handle);
        s_producer_task_handle = NULL;
        s_consumer_task_handle = NULL;
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
        s_is_running = false;
        return ret;
    }

    ESP_LOGI(TAG, "DMA + Dual Core architecture started:");
    ESP_LOGI(TAG, "  - Frequency: %.2f Hz, period: %llu us", s_config.sampling_frequency_hz, period_us);
    ESP_LOGI(TAG, "  - Producer task: Core %d", RT_DMA_DC_PRODUCER_CORE);
    ESP_LOGI(TAG, "  - Consumer task: Core %d", RT_DMA_DC_CONSUMER_CORE);

    return ESP_OK;
}

esp_err_t arch_dma_dc_stop(void)
{
    if (!s_is_running)
    {
        ESP_LOGW(TAG, "Architecture not running");
        return ESP_ERR_INVALID_STATE;
    }

    s_is_running = false;

    // Stop timer
    if (s_sampling_timer != NULL)
    {
        esp_timer_stop(s_sampling_timer);
        esp_timer_delete(s_sampling_timer);
        s_sampling_timer = NULL;
    }

    // Tasks will exit when s_is_running becomes false
    // Wait a bit for tasks to finish
    vTaskDelay(pdMS_TO_TICKS(100));

    // Force delete tasks if still running
    if (s_producer_task_handle != NULL)
    {
        vTaskDelete(s_producer_task_handle);
        s_producer_task_handle = NULL;
    }

    if (s_consumer_task_handle != NULL)
    {
        vTaskDelete(s_consumer_task_handle);
        s_consumer_task_handle = NULL;
    }

    ESP_LOGI(TAG, "DMA + Dual Core architecture stopped");
    return ESP_OK;
}

esp_err_t arch_dma_dc_deinit(void)
{
    if (s_is_running)
    {
        arch_dma_dc_stop();
    }

    // Delete semaphores
    if (s_double_buffer.buffer_a_sem != NULL)
    {
        vSemaphoreDelete(s_double_buffer.buffer_a_sem);
        s_double_buffer.buffer_a_sem = NULL;
    }

    if (s_double_buffer.buffer_b_sem != NULL)
    {
        vSemaphoreDelete(s_double_buffer.buffer_b_sem);
        s_double_buffer.buffer_b_sem = NULL;
    }

    // Delete mutexes
    if (s_double_buffer.buffer_a_mutex != NULL)
    {
        vSemaphoreDelete(s_double_buffer.buffer_a_mutex);
        s_double_buffer.buffer_a_mutex = NULL;
    }

    if (s_double_buffer.buffer_b_mutex != NULL)
    {
        vSemaphoreDelete(s_double_buffer.buffer_b_mutex);
        s_double_buffer.buffer_b_mutex = NULL;
    }

    // Free buffers (PSRAM)
    if (s_double_buffer.buffer_a != NULL)
    {
        heap_caps_free(s_double_buffer.buffer_a);
        s_double_buffer.buffer_a = NULL;
    }

    if (s_double_buffer.buffer_b != NULL)
    {
        heap_caps_free(s_double_buffer.buffer_b);
        s_double_buffer.buffer_b = NULL;
    }

    s_double_buffer.buffer_size = 0;
    s_double_buffer.buffer_a_write_idx = 0;
    s_double_buffer.buffer_b_write_idx = 0;
    s_double_buffer.buffer_a_ready = false;
    s_double_buffer.buffer_b_ready = false;
    s_double_buffer.active_buffer = 0;
    s_double_buffer.overwrite_count = 0;
    s_double_buffer.total_writes = 0;

    s_is_initialized = false;
    s_adxl355_handle = NULL;
    memset(&s_stats, 0, sizeof(rt_process_stats_t));

    ESP_LOGI(TAG, "DMA + Dual Core architecture deinitialized");
    return ESP_OK;
}

esp_err_t arch_dma_dc_get_status(rt_process_status_t *status)
{
    if (status == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    status->is_running = s_is_running;
    status->arch_type = RT_ARCH_DMA_DUAL_CORE;
    status->sampling_frequency_hz = s_config.sampling_frequency_hz;

    // Calculate buffer usage (sum of both buffers)
    if (s_double_buffer.buffer_a != NULL && s_double_buffer.buffer_b != NULL)
    {
        status->buffer_usage = s_double_buffer.buffer_a_write_idx + s_double_buffer.buffer_b_write_idx;
        status->queue_usage = status->buffer_usage;  // Same as buffer_usage for double buffer
    }
    else
    {
        status->buffer_usage = 0;
        status->queue_usage = 0;
    }

    return ESP_OK;
}

esp_err_t arch_dma_dc_get_stats(rt_process_stats_t *stats)
{
    if (stats == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    memcpy(stats, &s_stats, sizeof(rt_process_stats_t));
    return ESP_OK;
}

esp_err_t arch_dma_dc_set_frequency(float frequency_hz)
{
    if (frequency_hz <= 0.0f || frequency_hz > 10000.0f)
    {
        ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz", frequency_hz);
        return ESP_ERR_INVALID_ARG;
    }

    bool was_running = s_is_running;
    if (was_running)
    {
        arch_dma_dc_stop();
    }

    s_config.sampling_frequency_hz = frequency_hz;
    ESP_LOGI(TAG, "Sampling frequency updated to %.2f Hz", frequency_hz);

    if (was_running)
    {
        return arch_dma_dc_start();
    }

    return ESP_OK;
}

esp_err_t arch_dma_dc_is_running(bool *is_running)
{
    if (is_running == NULL)
    {
        return ESP_ERR_INVALID_ARG;
    }

    *is_running = s_is_running;
    return ESP_OK;
}

esp_err_t arch_dma_dc_set_accel_detection(bool enable)
{
    s_enable_accel_detection = enable;

    if (enable)
    {
        // Initialize LCD to white (normal state)
        lcd_clear(WHITE);
        s_current_lcd_color = WHITE;
        s_color_hold_active = false;
        s_color_hold_until_us = 0;
        ESP_LOGI(TAG, "Acceleration detection enabled with LCD feedback (0.3s persistence)");
    }
    else
    {
        // Reset LCD to white when disabled
        lcd_clear(WHITE);
        s_current_lcd_color = WHITE;
        s_color_hold_active = false;
        s_color_hold_until_us = 0;
        ESP_LOGI(TAG, "Acceleration detection disabled");
    }

    return ESP_OK;
}

Key Implementation

Core Pinning

Tasks are explicitly pinned to specific cores using xTaskCreatePinnedToCore():

// Producer on Core 0
xTaskCreatePinnedToCore(
    producer_task,
    "rt_dma_dc_producer",
    RT_DMA_DC_PRODUCER_STACK_SIZE,
    NULL,
    RT_DMA_DC_PRODUCER_PRIORITY,
    &s_producer_task_handle,
    RT_DMA_DC_PRODUCER_CORE  // Core 0
);

// Consumer on Core 1
xTaskCreatePinnedToCore(
    consumer_task,
    "rt_dma_dc_consumer",
    RT_DMA_DC_CONSUMER_STACK_SIZE,
    NULL,
    RT_DMA_DC_CONSUMER_PRIORITY,
    &s_consumer_task_handle,
    RT_DMA_DC_CONSUMER_CORE  // Core 1
);

Cross-Core Synchronization

Mutexes and semaphores work across cores:

  • Mutexes: Protect buffer access between Core 0 (producer) and Core 1 (consumer)
  • Semaphores: Signal buffer ready state across cores
  • Volatile Flags: Ensure proper visibility across cores

Producer Task (Core 0)

The producer task runs exclusively on Core 0:

  • Core Verification: Logs core ID on startup for verification
  • Dedicated CPU: Has dedicated CPU time for acquisition
  • No Interference: Not affected by processing workload on Core 1

Consumer Task (Core 1)

The consumer task runs exclusively on Core 1:

  • Core Verification: Logs core ID on startup for verification
  • Dedicated CPU: Has dedicated CPU time for processing
  • No Interference: Not affected by acquisition workload on Core 0

Usage Example

// Select Architecture 3
#define RT_PROCESS_ARCH_TYPE RT_ARCH_DMA_DUAL_CORE
#include "real-time-process-arch.h"

// Initialize
rt_process_config_t config = {
    .sampling_frequency_hz = 2000.0f,  // 2 kHz
    .queue_size = 256,
    .enable_mqtt = false,
    .enable_serial = true,
    .enable_accel_detection = false,
    .mqtt_topic = NULL
};

esp_err_t ret = rt_process_init(&config);
rt_process_set_sensor_handle(adxl355_handle);
rt_process_start();

// Verify core assignment (check logs)
// Producer should log: "Producer task started on Core 0"
// Consumer should log: "Consumer task started on Core 1"

// Monitor
rt_process_status_t status;
rt_process_get_status(&status);
ESP_LOGI("App", "Running: %s, Buffer usage: %lu", 
         status.is_running ? "Yes" : "No", status.buffer_usage);

// Stop
rt_process_stop();
rt_process_deinit();