CODE¶
Warning
The following code should be based on the code in the release code, which may have been updated.
arch_dma_dual_core.h¶
/**
* @file arch_dma_dual_core.h
* @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
* @brief Architecture 3: DMA + Dual Core Division of Labor
* @version 1.0
* @date 2025-01-17
* @copyright Copyright (c) 2025
*
* @details
* This architecture implements dual-core division of labor:
* - Core 0: Producer task (data acquisition with DMA)
* - Core 1: Consumer task (data processing)
* - Double buffer mechanism for parallel operation
* - DMA for SPI transfers to reduce CPU load
*/
#pragma once
#include "arch_common.h"
#include "node_acc_adxl355.h"
#include "esp_err.h"
#ifdef __cplusplus
extern "C"
{
#endif
/* ============================================================================
* CONFIGURATION
* ============================================================================ */
/**
* @brief Buffer size for DMA dual core buffering (number of samples per buffer)
* @note Uses TINY_MEASUREMENT_RT_DMA_DC_BUFFER_SIZE from tiny_measurement_config.h
*/
#ifndef RT_DMA_DC_BUFFER_SIZE
#define RT_DMA_DC_BUFFER_SIZE TINY_MEASUREMENT_RT_DMA_DC_BUFFER_SIZE
#endif
/**
* @brief Producer task priority (high priority for timely acquisition)
* @note Runs on Core 0
* @note Uses TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_PRIORITY from tiny_measurement_config.h
*/
#ifndef RT_DMA_DC_PRODUCER_PRIORITY
#define RT_DMA_DC_PRODUCER_PRIORITY TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_PRIORITY
#endif
/**
* @brief Consumer task priority (high priority for processing)
* @note Runs on Core 1
* @note Uses TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_PRIORITY from tiny_measurement_config.h
*/
#ifndef RT_DMA_DC_CONSUMER_PRIORITY
#define RT_DMA_DC_CONSUMER_PRIORITY TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_PRIORITY
#endif
/**
* @brief Producer task stack size
* @note Uses TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_STACK_SIZE from tiny_measurement_config.h
*/
#ifndef RT_DMA_DC_PRODUCER_STACK_SIZE
#define RT_DMA_DC_PRODUCER_STACK_SIZE TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_STACK_SIZE
#endif
/**
* @brief Consumer task stack size
* @note Uses TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_STACK_SIZE from tiny_measurement_config.h
*/
#ifndef RT_DMA_DC_CONSUMER_STACK_SIZE
#define RT_DMA_DC_CONSUMER_STACK_SIZE TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_STACK_SIZE
#endif
/**
* @brief Core assignment for producer task
* @note Uses TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_CORE from tiny_measurement_config.h
*/
#ifndef RT_DMA_DC_PRODUCER_CORE
#define RT_DMA_DC_PRODUCER_CORE TINY_MEASUREMENT_RT_DMA_DC_PRODUCER_CORE
#endif
/**
* @brief Core assignment for consumer task
* @note Uses TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_CORE from tiny_measurement_config.h
*/
#ifndef RT_DMA_DC_CONSUMER_CORE
#define RT_DMA_DC_CONSUMER_CORE TINY_MEASUREMENT_RT_DMA_DC_CONSUMER_CORE
#endif
/* ============================================================================
* FUNCTION DECLARATIONS
* ============================================================================ */
/**
* @brief Initialize DMA + Dual Core architecture
* @param config Configuration structure
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_init(const rt_process_config_t *config);
/**
* @brief Set sensor handle for DMA + Dual Core architecture
* @param handle ADXL355 sensor handle
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_set_sensor_handle(adxl355_handle_t *handle);
/**
* @brief Start DMA + Dual Core architecture
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_start(void);
/**
* @brief Stop DMA + Dual Core architecture
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_stop(void);
/**
* @brief Deinitialize DMA + Dual Core architecture
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_deinit(void);
/**
* @brief Get status of DMA + Dual Core architecture
* @param status Output parameter for status
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_get_status(rt_process_status_t *status);
/**
* @brief Get statistics of DMA + Dual Core architecture
* @param stats Output parameter for statistics
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_get_stats(rt_process_stats_t *stats);
/**
* @brief Set sampling frequency for DMA + Dual Core architecture
* @param frequency_hz New sampling frequency in Hz
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_set_frequency(float frequency_hz);
/**
* @brief Check if DMA + Dual Core architecture is running
* @param is_running Output parameter: true if running
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_is_running(bool *is_running);
/**
* @brief Enable or disable acceleration detection with LCD feedback
* @param enable true to enable detection, false to disable
* @return ESP_OK on success, error code on failure
*/
esp_err_t arch_dma_dc_set_accel_detection(bool enable);
#ifdef __cplusplus
}
#endif
arch_dma_dual_core.c¶
/**
* @file arch_dma_dual_core.c
* @author SHUAIWEN CUI (SHUAIWEN001@e.ntu.edu.sg)
* @brief Architecture 3: DMA + Dual Core Division of Labor Implementation
* @version 1.0
* @date 2025-01-17
* @copyright Copyright (c) 2025
*
* @details
* Dual-core architecture:
* - Core 0: Producer task (data acquisition with DMA)
* - Core 1: Consumer task (data processing)
* - Double buffer mechanism for parallel operation
*/
#include "arch_dma_dual_core.h"
#include "node_mqtt.h" // Provides s_mqtt_client, s_is_mqtt_connected, MQTT_PUBLISH_TOPIC
#include "node_lcd.h" // LCD control for acceleration detection
#include "esp_log.h"
#include "esp_timer.h"
#include "esp_heap_caps.h" // For PSRAM allocation
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include <string.h>
#include <stdio.h>
#include <math.h>
/* ============================================================================
* PRIVATE DEFINITIONS
* ============================================================================ */
static const char *TAG = "RTArch_DMA_DC";
/* ============================================================================
* PRIVATE VARIABLES
* ============================================================================ */
// Configuration
static rt_process_config_t s_config = {
.sampling_frequency_hz = 100.0f,
.queue_size = RT_DMA_DC_BUFFER_SIZE,
.enable_mqtt = false,
.enable_serial = true,
.enable_accel_detection = false,
.mqtt_topic = NULL
};
// Double buffer structure (same as DMA double buffer, but with core pinning)
typedef struct {
rt_process_sample_t *buffer_a; // Buffer A (PSRAM)
rt_process_sample_t *buffer_b; // Buffer B (PSRAM)
uint32_t buffer_size; // Size of each buffer
volatile uint32_t buffer_a_write_idx; // Write index for buffer A
volatile uint32_t buffer_b_write_idx; // Write index for buffer B
volatile bool buffer_a_ready; // Buffer A is ready for processing
volatile bool buffer_b_ready; // Buffer B is ready for processing
volatile uint32_t active_buffer; // 0 = buffer A, 1 = buffer B (for producer)
SemaphoreHandle_t buffer_a_mutex; // Mutex for buffer A
SemaphoreHandle_t buffer_b_mutex; // Mutex for buffer B
SemaphoreHandle_t buffer_a_sem; // Semaphore: buffer A ready for consumer
SemaphoreHandle_t buffer_b_sem; // Semaphore: buffer B ready for consumer
uint32_t overwrite_count; // Count of buffer overwrites
uint32_t total_writes; // Total number of writes
} rt_dc_double_buffer_t;
static rt_dc_double_buffer_t s_double_buffer = {0};
// Task and timer handles
static TaskHandle_t s_producer_task_handle = NULL; // Core 0
static TaskHandle_t s_consumer_task_handle = NULL; // Core 1
static esp_timer_handle_t s_sampling_timer = NULL;
static bool s_is_running = false;
static bool s_is_initialized = false;
static adxl355_handle_t *s_adxl355_handle = NULL;
// Statistics
static rt_process_stats_t s_stats = {0};
// Acceleration detection state
static bool s_enable_accel_detection = false;
static uint16_t s_current_lcd_color = 0xFFFF; // Current LCD color (WHITE)
static uint64_t s_last_lcd_update_us = 0;
static uint64_t s_color_hold_until_us = 0;
static bool s_color_hold_active = false;
static bool s_lcd_update_in_progress = false;
// LCD color hold duration: 0.3 seconds
#define LCD_COLOR_HOLD_DURATION_US 300000U // 0.3 seconds (300ms) in microseconds
// LCD update minimum interval to avoid SPI bus conflict
#define LCD_UPDATE_MIN_INTERVAL_US 100000U // Minimum 100ms between LCD updates
// Detection thresholds
#define ACCEL_THRESHOLD_X 0.5f // Condition 1: |x| > 0.5g
#define ACCEL_THRESHOLD_Y 0.5f // Condition 2: |y| > 0.5g
#define ACCEL_THRESHOLD_Z 0.5f // Condition 3: |z| < 0.5g
/* ============================================================================
* PRIVATE FUNCTION DECLARATIONS
* ============================================================================ */
static void sampling_timer_callback(void *arg);
static void producer_task(void *pvParameters); // Runs on Core 0
static void consumer_task(void *pvParameters); // Runs on Core 1
static void update_stats_acquisition(uint64_t time_us);
static void update_stats_processing(uint64_t time_us);
/* ============================================================================
* PRIVATE FUNCTION IMPLEMENTATIONS
* ============================================================================ */
/**
* @brief Timer callback for periodic sampling (triggers producer task on Core 0)
*/
static void sampling_timer_callback(void *arg)
{
if (!s_is_running || s_adxl355_handle == NULL)
{
return;
}
// Notify producer task (Core 0) to read sensor
if (s_producer_task_handle != NULL)
{
xTaskNotify(s_producer_task_handle, 1, eSetBits);
}
}
/**
* @brief Update acquisition time statistics
*/
static void update_stats_acquisition(uint64_t time_us)
{
// Simple moving average
if (s_stats.total_samples == 0)
{
s_stats.avg_acquisition_time_us = (float)time_us;
}
else
{
s_stats.avg_acquisition_time_us =
(s_stats.avg_acquisition_time_us * 0.9f) + ((float)time_us * 0.1f);
}
}
/**
* @brief Update processing time statistics
*/
static void update_stats_processing(uint64_t time_us)
{
// Simple moving average
if (s_stats.processed_samples == 0)
{
s_stats.avg_process_time_us = (float)time_us;
}
else
{
s_stats.avg_process_time_us =
(s_stats.avg_process_time_us * 0.9f) + ((float)time_us * 0.1f);
}
}
/**
* @brief Producer task: Reads sensor data and fills double buffers (Core 0)
* @note This task is pinned to Core 0 for dedicated data acquisition
*/
static void producer_task(void *pvParameters)
{
rt_process_sample_t sample;
uint32_t notification_value;
int64_t start_time;
ESP_LOGI(TAG, "Producer task started on Core %d (DMA + Dual Core mode)",
xPortGetCoreID());
while (s_is_running)
{
// Wait for timer notification
if (xTaskNotifyWait(0, ULONG_MAX, ¬ification_value, portMAX_DELAY) == pdTRUE)
{
if (!s_is_running || s_adxl355_handle == NULL)
{
break;
}
start_time = esp_timer_get_time();
// Read sensor data (DMA will be used automatically by SPI driver)
adxl355_accelerations_t accel;
float temperature;
esp_err_t accel_ret = adxl355_read_accelerations(s_adxl355_handle, &accel);
esp_err_t temp_ret = adxl355_read_temperature(s_adxl355_handle, &temperature);
if (accel_ret == ESP_OK && temp_ret == ESP_OK)
{
// Prepare sample
sample.x = accel.x;
sample.y = accel.y;
sample.z = accel.z;
sample.temp = temperature;
sample.timestamp_us = esp_timer_get_time();
// Update acquisition statistics
update_stats_acquisition(esp_timer_get_time() - start_time);
// Write to active buffer (A or B)
uint32_t active = s_double_buffer.active_buffer;
SemaphoreHandle_t mutex = (active == 0) ? s_double_buffer.buffer_a_mutex : s_double_buffer.buffer_b_mutex;
rt_process_sample_t *target_buffer = (active == 0) ? s_double_buffer.buffer_a : s_double_buffer.buffer_b;
volatile uint32_t *write_idx = (active == 0) ? &s_double_buffer.buffer_a_write_idx : &s_double_buffer.buffer_b_write_idx;
if (xSemaphoreTake(mutex, pdMS_TO_TICKS(10)) == pdTRUE)
{
// Write sample to current buffer
target_buffer[*write_idx] = sample;
(*write_idx)++;
s_double_buffer.total_writes++;
// Check if buffer is full
if (*write_idx >= s_double_buffer.buffer_size)
{
// Buffer full: mark as ready and switch to other buffer
if (active == 0)
{
s_double_buffer.buffer_a_ready = true;
s_double_buffer.active_buffer = 1; // Switch to buffer B
s_double_buffer.buffer_a_write_idx = 0; // Reset write index
xSemaphoreGive(s_double_buffer.buffer_a_sem); // Notify consumer (Core 1)
}
else
{
s_double_buffer.buffer_b_ready = true;
s_double_buffer.active_buffer = 0; // Switch to buffer A
s_double_buffer.buffer_b_write_idx = 0; // Reset write index
xSemaphoreGive(s_double_buffer.buffer_b_sem); // Notify consumer (Core 1)
}
// Check for overwrite (if consumer didn't process in time)
if ((active == 0 && s_double_buffer.buffer_a_ready) ||
(active == 1 && s_double_buffer.buffer_b_ready))
{
s_double_buffer.overwrite_count++;
ESP_LOGW(TAG, "Buffer overwrite detected (buffer %s)", active == 0 ? "A" : "B");
}
}
xSemaphoreGive(mutex);
s_stats.total_samples++;
}
else
{
s_stats.dropped_samples++;
ESP_LOGW(TAG, "Mutex timeout, sample dropped");
}
}
else
{
ESP_LOGE(TAG, "Failed to read sensor: accel=%s, temp=%s",
esp_err_to_name(accel_ret), esp_err_to_name(temp_ret));
}
}
}
ESP_LOGI(TAG, "Producer task stopped (Core %d)", xPortGetCoreID());
vTaskDelete(NULL);
}
/**
* @brief Consumer task: Processes filled buffers from double buffer (Core 1)
* @note This task is pinned to Core 1 for dedicated data processing
*/
static void consumer_task(void *pvParameters)
{
ESP_LOGI(TAG, "Consumer task started on Core %d (DMA + Dual Core mode)",
xPortGetCoreID());
while (s_is_running)
{
// Wait for either buffer A or buffer B to be ready
uint32_t buffer_to_process = 0; // 0 = buffer A, 1 = buffer B
bool buffer_ready = false;
// Wait for buffer A or buffer B to be ready (with timeout)
if (xSemaphoreTake(s_double_buffer.buffer_a_sem, pdMS_TO_TICKS(100)) == pdTRUE)
{
buffer_to_process = 0;
buffer_ready = true;
}
else if (xSemaphoreTake(s_double_buffer.buffer_b_sem, pdMS_TO_TICKS(100)) == pdTRUE)
{
buffer_to_process = 1;
buffer_ready = true;
}
if (!s_is_running)
{
break;
}
if (buffer_ready)
{
int64_t process_start_time = esp_timer_get_time();
// Get buffer and mutex for the buffer to process
rt_process_sample_t *process_buffer = (buffer_to_process == 0) ?
s_double_buffer.buffer_a : s_double_buffer.buffer_b;
SemaphoreHandle_t mutex = (buffer_to_process == 0) ?
s_double_buffer.buffer_a_mutex : s_double_buffer.buffer_b_mutex;
uint32_t buffer_size = s_double_buffer.buffer_size;
// Process the buffer
if (xSemaphoreTake(mutex, pdMS_TO_TICKS(100)) == pdTRUE)
{
// Process all samples in the buffer
for (uint32_t i = 0; i < buffer_size; i++)
{
rt_process_sample_t *sample = &process_buffer[i];
// Acceleration detection
if (s_enable_accel_detection)
{
bool condition_met = (fabsf(sample->x) > ACCEL_THRESHOLD_X) ||
(fabsf(sample->y) > ACCEL_THRESHOLD_Y) ||
(fabsf(sample->z) < ACCEL_THRESHOLD_Z);
uint64_t current_time = esp_timer_get_time();
uint16_t target_color;
bool should_update = false;
if (condition_met)
{
target_color = RED;
if (!s_color_hold_active || s_current_lcd_color != RED)
{
s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
s_color_hold_active = true;
should_update = true;
ESP_LOGI(TAG, "LCD -> RED (buffer %s, Core %d)",
buffer_to_process == 0 ? "A" : "B", xPortGetCoreID());
}
else
{
s_color_hold_until_us = current_time + LCD_COLOR_HOLD_DURATION_US;
}
}
else
{
if (s_color_hold_active && current_time < s_color_hold_until_us)
{
target_color = RED;
}
else
{
target_color = WHITE;
if (s_color_hold_active)
{
s_color_hold_active = false;
should_update = true;
ESP_LOGI(TAG, "LCD -> WHITE (hold expired, Core %d)", xPortGetCoreID());
}
}
}
if (should_update && target_color != s_current_lcd_color)
{
uint64_t time_since_last_update = current_time - s_last_lcd_update_us;
if (!s_lcd_update_in_progress && time_since_last_update >= LCD_UPDATE_MIN_INTERVAL_US)
{
s_lcd_update_in_progress = true;
lcd_clear(target_color);
s_current_lcd_color = target_color;
s_last_lcd_update_us = current_time;
s_lcd_update_in_progress = false;
}
}
}
// Serial output (if enabled)
if (s_config.enable_serial && i == 0) // Output first sample only
{
printf("BUFFER_%s_CORE1: x=%.3f,y=%.3f,z=%.3f,temp=%.1f\n",
buffer_to_process == 0 ? "A" : "B",
sample->x, sample->y, sample->z, sample->temp);
}
}
// Mark buffer as processed and reset ready flag
if (buffer_to_process == 0)
{
s_double_buffer.buffer_a_ready = false;
}
else
{
s_double_buffer.buffer_b_ready = false;
}
xSemaphoreGive(mutex);
// Update processing statistics
uint64_t process_time_us = esp_timer_get_time() - process_start_time;
update_stats_processing(process_time_us);
s_stats.processed_samples += buffer_size;
// Log periodically
static uint32_t log_counter = 0;
if (++log_counter >= 10)
{
ESP_LOGI(TAG, "Buffer %s processed on Core %d: %u samples, process_time=%llu us",
buffer_to_process == 0 ? "A" : "B", xPortGetCoreID(), buffer_size, process_time_us);
log_counter = 0;
}
}
else
{
ESP_LOGW(TAG, "Failed to acquire mutex for buffer %s", buffer_to_process == 0 ? "A" : "B");
}
}
}
ESP_LOGI(TAG, "Consumer task stopped (Core %d)", xPortGetCoreID());
vTaskDelete(NULL);
}
/* ============================================================================
* PUBLIC FUNCTION IMPLEMENTATIONS
* ============================================================================ */
esp_err_t arch_dma_dc_init(const rt_process_config_t *config)
{
if (s_is_initialized)
{
ESP_LOGW(TAG, "Architecture already initialized");
return ESP_ERR_INVALID_STATE;
}
// Apply configuration
if (config != NULL)
{
memcpy(&s_config, config, sizeof(rt_process_config_t));
}
else
{
// Use defaults
s_config.sampling_frequency_hz = 100.0f;
s_config.queue_size = RT_DMA_DC_BUFFER_SIZE;
s_config.enable_mqtt = false;
s_config.enable_serial = true;
s_config.enable_accel_detection = false;
s_config.mqtt_topic = NULL;
}
// Set acceleration detection state from config
s_enable_accel_detection = s_config.enable_accel_detection;
// Validate configuration
if (s_config.sampling_frequency_hz <= 0.0f || s_config.sampling_frequency_hz > 10000.0f)
{
ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz (valid range: 0.1 - 10000 Hz)",
s_config.sampling_frequency_hz);
return ESP_ERR_INVALID_ARG;
}
// Initialize double buffer
s_double_buffer.buffer_size = RT_DMA_DC_BUFFER_SIZE;
s_double_buffer.buffer_a_write_idx = 0;
s_double_buffer.buffer_b_write_idx = 0;
s_double_buffer.buffer_a_ready = false;
s_double_buffer.buffer_b_ready = false;
s_double_buffer.active_buffer = 0; // Start with buffer A
s_double_buffer.overwrite_count = 0;
s_double_buffer.total_writes = 0;
// Allocate buffers from PSRAM
size_t required_bytes = RT_DMA_DC_BUFFER_SIZE * sizeof(rt_process_sample_t);
size_t psram_free = heap_caps_get_free_size(MALLOC_CAP_SPIRAM);
if (psram_free < required_bytes * 2)
{
ESP_LOGE(TAG, "PSRAM insufficient: available %zu bytes, need %zu bytes",
psram_free, required_bytes * 2);
return ESP_ERR_NO_MEM;
}
// Allocate buffer A
s_double_buffer.buffer_a = (rt_process_sample_t *)heap_caps_malloc(
required_bytes,
MALLOC_CAP_SPIRAM
);
if (s_double_buffer.buffer_a == NULL)
{
ESP_LOGE(TAG, "Failed to allocate buffer A from PSRAM");
return ESP_ERR_NO_MEM;
}
memset(s_double_buffer.buffer_a, 0, required_bytes);
// Allocate buffer B
s_double_buffer.buffer_b = (rt_process_sample_t *)heap_caps_malloc(
required_bytes,
MALLOC_CAP_SPIRAM
);
if (s_double_buffer.buffer_b == NULL)
{
ESP_LOGE(TAG, "Failed to allocate buffer B from PSRAM");
heap_caps_free(s_double_buffer.buffer_a);
s_double_buffer.buffer_a = NULL;
return ESP_ERR_NO_MEM;
}
memset(s_double_buffer.buffer_b, 0, required_bytes);
// Create mutexes
s_double_buffer.buffer_a_mutex = xSemaphoreCreateMutex();
s_double_buffer.buffer_b_mutex = xSemaphoreCreateMutex();
if (s_double_buffer.buffer_a_mutex == NULL || s_double_buffer.buffer_b_mutex == NULL)
{
ESP_LOGE(TAG, "Failed to create mutexes");
if (s_double_buffer.buffer_a != NULL) heap_caps_free(s_double_buffer.buffer_a);
if (s_double_buffer.buffer_b != NULL) heap_caps_free(s_double_buffer.buffer_b);
if (s_double_buffer.buffer_a_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_a_mutex);
if (s_double_buffer.buffer_b_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_b_mutex);
return ESP_ERR_NO_MEM;
}
// Create semaphores (binary semaphores for buffer ready signals)
s_double_buffer.buffer_a_sem = xSemaphoreCreateBinary();
s_double_buffer.buffer_b_sem = xSemaphoreCreateBinary();
if (s_double_buffer.buffer_a_sem == NULL || s_double_buffer.buffer_b_sem == NULL)
{
ESP_LOGE(TAG, "Failed to create semaphores");
if (s_double_buffer.buffer_a != NULL) heap_caps_free(s_double_buffer.buffer_a);
if (s_double_buffer.buffer_b != NULL) heap_caps_free(s_double_buffer.buffer_b);
if (s_double_buffer.buffer_a_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_a_mutex);
if (s_double_buffer.buffer_b_mutex != NULL) vSemaphoreDelete(s_double_buffer.buffer_b_mutex);
return ESP_ERR_NO_MEM;
}
// Initialize statistics
memset(&s_stats, 0, sizeof(rt_process_stats_t));
s_is_initialized = true;
ESP_LOGI(TAG, "DMA + Dual Core architecture initialized:");
ESP_LOGI(TAG, " - Sampling frequency: %.2f Hz", s_config.sampling_frequency_hz);
ESP_LOGI(TAG, " - Buffer size: %u samples per buffer", RT_DMA_DC_BUFFER_SIZE);
ESP_LOGI(TAG, " - Total buffer memory: %zu bytes (PSRAM)", required_bytes * 2);
ESP_LOGI(TAG, " - Producer task: Core %d", RT_DMA_DC_PRODUCER_CORE);
ESP_LOGI(TAG, " - Consumer task: Core %d", RT_DMA_DC_CONSUMER_CORE);
ESP_LOGI(TAG, " - MQTT enabled: %s", s_config.enable_mqtt ? "Yes" : "No");
ESP_LOGI(TAG, " - Serial output enabled: %s", s_config.enable_serial ? "Yes" : "No");
return ESP_OK;
}
esp_err_t arch_dma_dc_set_sensor_handle(adxl355_handle_t *handle)
{
if (handle == NULL)
{
return ESP_ERR_INVALID_ARG;
}
s_adxl355_handle = handle;
ESP_LOGI(TAG, "Sensor handle set");
return ESP_OK;
}
esp_err_t arch_dma_dc_start(void)
{
if (!s_is_initialized)
{
ESP_LOGE(TAG, "Architecture not initialized");
return ESP_ERR_INVALID_STATE;
}
if (s_is_running)
{
ESP_LOGW(TAG, "Architecture already running");
return ESP_ERR_INVALID_STATE;
}
if (s_adxl355_handle == NULL)
{
ESP_LOGE(TAG, "Sensor handle not set");
return ESP_ERR_INVALID_STATE;
}
// Calculate timer period
uint64_t period_us = (uint64_t)(1000000.0f / s_config.sampling_frequency_hz);
if (period_us < 100)
{
ESP_LOGE(TAG, "Sampling frequency too high: %.2f Hz", s_config.sampling_frequency_hz);
return ESP_ERR_INVALID_ARG;
}
// Create sampling timer
esp_timer_create_args_t timer_args = {
.callback = sampling_timer_callback,
.arg = NULL,
.name = "rt_dma_dc_sampling_timer"
};
esp_err_t ret = esp_timer_create(&timer_args, &s_sampling_timer);
if (ret != ESP_OK)
{
ESP_LOGE(TAG, "Failed to create timer: %s", esp_err_to_name(ret));
return ret;
}
// Set running flag
s_is_running = true;
// Create producer task on Core 0 (high priority)
BaseType_t task_ret = xTaskCreatePinnedToCore(
producer_task,
"rt_dma_dc_producer",
RT_DMA_DC_PRODUCER_STACK_SIZE,
NULL,
RT_DMA_DC_PRODUCER_PRIORITY,
&s_producer_task_handle,
RT_DMA_DC_PRODUCER_CORE // Pin to Core 0
);
if (task_ret != pdPASS)
{
ESP_LOGE(TAG, "Failed to create producer task on Core %d", RT_DMA_DC_PRODUCER_CORE);
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
s_is_running = false;
return ESP_ERR_NO_MEM;
}
// Create consumer task on Core 1 (high priority)
task_ret = xTaskCreatePinnedToCore(
consumer_task,
"rt_dma_dc_consumer",
RT_DMA_DC_CONSUMER_STACK_SIZE,
NULL,
RT_DMA_DC_CONSUMER_PRIORITY,
&s_consumer_task_handle,
RT_DMA_DC_CONSUMER_CORE // Pin to Core 1
);
if (task_ret != pdPASS)
{
ESP_LOGE(TAG, "Failed to create consumer task on Core %d", RT_DMA_DC_CONSUMER_CORE);
vTaskDelete(s_producer_task_handle);
s_producer_task_handle = NULL;
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
s_is_running = false;
return ESP_ERR_NO_MEM;
}
// Perform immediate first sample
sampling_timer_callback(NULL);
// Start periodic timer
ret = esp_timer_start_periodic(s_sampling_timer, period_us);
if (ret != ESP_OK)
{
ESP_LOGE(TAG, "Failed to start timer: %s", esp_err_to_name(ret));
vTaskDelete(s_producer_task_handle);
vTaskDelete(s_consumer_task_handle);
s_producer_task_handle = NULL;
s_consumer_task_handle = NULL;
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
s_is_running = false;
return ret;
}
ESP_LOGI(TAG, "DMA + Dual Core architecture started:");
ESP_LOGI(TAG, " - Frequency: %.2f Hz, period: %llu us", s_config.sampling_frequency_hz, period_us);
ESP_LOGI(TAG, " - Producer task: Core %d", RT_DMA_DC_PRODUCER_CORE);
ESP_LOGI(TAG, " - Consumer task: Core %d", RT_DMA_DC_CONSUMER_CORE);
return ESP_OK;
}
esp_err_t arch_dma_dc_stop(void)
{
if (!s_is_running)
{
ESP_LOGW(TAG, "Architecture not running");
return ESP_ERR_INVALID_STATE;
}
s_is_running = false;
// Stop timer
if (s_sampling_timer != NULL)
{
esp_timer_stop(s_sampling_timer);
esp_timer_delete(s_sampling_timer);
s_sampling_timer = NULL;
}
// Tasks will exit when s_is_running becomes false
// Wait a bit for tasks to finish
vTaskDelay(pdMS_TO_TICKS(100));
// Force delete tasks if still running
if (s_producer_task_handle != NULL)
{
vTaskDelete(s_producer_task_handle);
s_producer_task_handle = NULL;
}
if (s_consumer_task_handle != NULL)
{
vTaskDelete(s_consumer_task_handle);
s_consumer_task_handle = NULL;
}
ESP_LOGI(TAG, "DMA + Dual Core architecture stopped");
return ESP_OK;
}
esp_err_t arch_dma_dc_deinit(void)
{
if (s_is_running)
{
arch_dma_dc_stop();
}
// Delete semaphores
if (s_double_buffer.buffer_a_sem != NULL)
{
vSemaphoreDelete(s_double_buffer.buffer_a_sem);
s_double_buffer.buffer_a_sem = NULL;
}
if (s_double_buffer.buffer_b_sem != NULL)
{
vSemaphoreDelete(s_double_buffer.buffer_b_sem);
s_double_buffer.buffer_b_sem = NULL;
}
// Delete mutexes
if (s_double_buffer.buffer_a_mutex != NULL)
{
vSemaphoreDelete(s_double_buffer.buffer_a_mutex);
s_double_buffer.buffer_a_mutex = NULL;
}
if (s_double_buffer.buffer_b_mutex != NULL)
{
vSemaphoreDelete(s_double_buffer.buffer_b_mutex);
s_double_buffer.buffer_b_mutex = NULL;
}
// Free buffers (PSRAM)
if (s_double_buffer.buffer_a != NULL)
{
heap_caps_free(s_double_buffer.buffer_a);
s_double_buffer.buffer_a = NULL;
}
if (s_double_buffer.buffer_b != NULL)
{
heap_caps_free(s_double_buffer.buffer_b);
s_double_buffer.buffer_b = NULL;
}
s_double_buffer.buffer_size = 0;
s_double_buffer.buffer_a_write_idx = 0;
s_double_buffer.buffer_b_write_idx = 0;
s_double_buffer.buffer_a_ready = false;
s_double_buffer.buffer_b_ready = false;
s_double_buffer.active_buffer = 0;
s_double_buffer.overwrite_count = 0;
s_double_buffer.total_writes = 0;
s_is_initialized = false;
s_adxl355_handle = NULL;
memset(&s_stats, 0, sizeof(rt_process_stats_t));
ESP_LOGI(TAG, "DMA + Dual Core architecture deinitialized");
return ESP_OK;
}
esp_err_t arch_dma_dc_get_status(rt_process_status_t *status)
{
if (status == NULL)
{
return ESP_ERR_INVALID_ARG;
}
status->is_running = s_is_running;
status->arch_type = RT_ARCH_DMA_DUAL_CORE;
status->sampling_frequency_hz = s_config.sampling_frequency_hz;
// Calculate buffer usage (sum of both buffers)
if (s_double_buffer.buffer_a != NULL && s_double_buffer.buffer_b != NULL)
{
status->buffer_usage = s_double_buffer.buffer_a_write_idx + s_double_buffer.buffer_b_write_idx;
status->queue_usage = status->buffer_usage; // Same as buffer_usage for double buffer
}
else
{
status->buffer_usage = 0;
status->queue_usage = 0;
}
return ESP_OK;
}
esp_err_t arch_dma_dc_get_stats(rt_process_stats_t *stats)
{
if (stats == NULL)
{
return ESP_ERR_INVALID_ARG;
}
memcpy(stats, &s_stats, sizeof(rt_process_stats_t));
return ESP_OK;
}
esp_err_t arch_dma_dc_set_frequency(float frequency_hz)
{
if (frequency_hz <= 0.0f || frequency_hz > 10000.0f)
{
ESP_LOGE(TAG, "Invalid sampling frequency: %.2f Hz", frequency_hz);
return ESP_ERR_INVALID_ARG;
}
bool was_running = s_is_running;
if (was_running)
{
arch_dma_dc_stop();
}
s_config.sampling_frequency_hz = frequency_hz;
ESP_LOGI(TAG, "Sampling frequency updated to %.2f Hz", frequency_hz);
if (was_running)
{
return arch_dma_dc_start();
}
return ESP_OK;
}
esp_err_t arch_dma_dc_is_running(bool *is_running)
{
if (is_running == NULL)
{
return ESP_ERR_INVALID_ARG;
}
*is_running = s_is_running;
return ESP_OK;
}
esp_err_t arch_dma_dc_set_accel_detection(bool enable)
{
s_enable_accel_detection = enable;
if (enable)
{
// Initialize LCD to white (normal state)
lcd_clear(WHITE);
s_current_lcd_color = WHITE;
s_color_hold_active = false;
s_color_hold_until_us = 0;
ESP_LOGI(TAG, "Acceleration detection enabled with LCD feedback (0.3s persistence)");
}
else
{
// Reset LCD to white when disabled
lcd_clear(WHITE);
s_current_lcd_color = WHITE;
s_color_hold_active = false;
s_color_hold_until_us = 0;
ESP_LOGI(TAG, "Acceleration detection disabled");
}
return ESP_OK;
}
Key Implementation¶
Core Pinning¶
Tasks are explicitly pinned to specific cores using xTaskCreatePinnedToCore():
// Producer on Core 0
xTaskCreatePinnedToCore(
producer_task,
"rt_dma_dc_producer",
RT_DMA_DC_PRODUCER_STACK_SIZE,
NULL,
RT_DMA_DC_PRODUCER_PRIORITY,
&s_producer_task_handle,
RT_DMA_DC_PRODUCER_CORE // Core 0
);
// Consumer on Core 1
xTaskCreatePinnedToCore(
consumer_task,
"rt_dma_dc_consumer",
RT_DMA_DC_CONSUMER_STACK_SIZE,
NULL,
RT_DMA_DC_CONSUMER_PRIORITY,
&s_consumer_task_handle,
RT_DMA_DC_CONSUMER_CORE // Core 1
);
Cross-Core Synchronization¶
Mutexes and semaphores work across cores:
- Mutexes: Protect buffer access between Core 0 (producer) and Core 1 (consumer)
- Semaphores: Signal buffer ready state across cores
- Volatile Flags: Ensure proper visibility across cores
Producer Task (Core 0)¶
The producer task runs exclusively on Core 0:
- Core Verification: Logs core ID on startup for verification
- Dedicated CPU: Has dedicated CPU time for acquisition
- No Interference: Not affected by processing workload on Core 1
Consumer Task (Core 1)¶
The consumer task runs exclusively on Core 1:
- Core Verification: Logs core ID on startup for verification
- Dedicated CPU: Has dedicated CPU time for processing
- No Interference: Not affected by acquisition workload on Core 0
Usage Example¶
// Select Architecture 3
#define RT_PROCESS_ARCH_TYPE RT_ARCH_DMA_DUAL_CORE
#include "real-time-process-arch.h"
// Initialize
rt_process_config_t config = {
.sampling_frequency_hz = 2000.0f, // 2 kHz
.queue_size = 256,
.enable_mqtt = false,
.enable_serial = true,
.enable_accel_detection = false,
.mqtt_topic = NULL
};
esp_err_t ret = rt_process_init(&config);
rt_process_set_sensor_handle(adxl355_handle);
rt_process_start();
// Verify core assignment (check logs)
// Producer should log: "Producer task started on Core 0"
// Consumer should log: "Consumer task started on Core 1"
// Monitor
rt_process_status_t status;
rt_process_get_status(&status);
ESP_LOGI("App", "Running: %s, Buffer usage: %lu",
status.is_running ? "Yes" : "No", status.buffer_usage);
// Stop
rt_process_stop();
rt_process_deinit();