跳转至

MQTT - 消息队列遥测传输协议

消息队列遥测传输协议(MQTT)是一种轻量级的消息传递协议,专为低带宽、高延迟或不可靠网络环境设计。它通常用于物联网(IoT)设备之间的通信。简单来说,MQTT服务器就相当于一个中介,在用户和设备之间传递消息。本节将介绍如何将Arduino链接到MQTT服务器,从而实现远程控制和监控。

下面是本项目中使用的MQTT相关代码文件:

mqtt.hpp

#pragma once
#include <Arduino.h>
#include "config.hpp"
#include "nodestate.hpp"
#include <WiFiS3.h>
#include <PubSubClient.h>
#include "time.hpp"
#include "sensing.hpp"
#include "rgbled.hpp"

// Create a WiFi client and wrap it in PubSubClient

extern WiFiClient wifi_client;
extern PubSubClient mqtt_client;

// === Retrieval Filename
extern char retrieval_filename[32];

// to slow down MQTT loop
bool should_run_mqtt_loop();

// Initialize and connect to MQTT broker
void mqtt_setup();

// Call this in loop() to keep MQTT alive
void mqtt_loop();

// Publish a test message to broker
void mqtt_publish_test();

mqtt.cpp

#include "mqtt.hpp"


// Create a WiFi client and wrap it in PubSubClient
WiFiClient wifi_client;
PubSubClient mqtt_client(wifi_client);

// Parsed Command Variables
char cmd_sensing_raw[128];
MCUTime parsed_start_time;
uint16_t parsed_freq = 0;
uint16_t parsed_duration = 0;

// === Retrieval Filename
char retrieval_filename[32] = {0}; // Initialize as empty string

// to slow down MQTT loop
bool should_run_mqtt_loop()
{
  static unsigned long last = 0;
  unsigned long now = millis();
  if (now - last >= 500)
  {
    last = now;
    return true;
  }
  return false;
}

// Callback when subscribed message is received
void mqtt_callback(char *topic, byte *payload, unsigned int length)
{
  Serial.print("[COMMUNICATION] <MQTT> Message received [");
  Serial.print(topic);
  Serial.print("]: ");

  char message[length + 1];
  for (unsigned int i = 0; i < length; ++i)
  {
    message[i] = (char)payload[i];
    Serial.print(message[i]);
  }
  message[length] = '\0';
  Serial.println();

  // Clean trailing \r or \n
  while (length > 0 && (message[length - 1] == '\r' || message[length - 1] == '\n'))
  {
    message[--length] = '\0';
  }

  String msg_str(message);

  if (msg_str == "CMD_NTP")
  {
    node_status.node_flags.gateway_ntp_required = true;
    node_status.node_flags.leafnode_ntp_required = true;
    Serial.println("[COMMUNICATION] <CMD> CMD_NTP received.");

    // switch to COMMUNICATING state
    node_status.set_state(NodeState::COMMUNICATING);
    rgbled_set_all(CRGB::Blue); // Set LED to blue during NTP sync
  }
  else if (msg_str == "CMD_GATEWAY_NTP")
  {
    node_status.node_flags.gateway_ntp_required = true;
    Serial.println("[COMMUNICATION] <CMD> CMD_GATEWAY_NTP received.");

    // switch to COMMUNICATING state
    node_status.set_state(NodeState::COMMUNICATING);
    rgbled_set_all(CRGB::Blue); // Set LED to blue during NTP sync
  }
  else if (msg_str == "CMD_LEAFNODE_NTP")
  {
    node_status.node_flags.leafnode_ntp_required = true;
    Serial.println("[COMMUNICATION] <CMD> CMD_LEAFNODE_NTP received.");

    // switch to COMMUNICATING state
    node_status.set_state(NodeState::COMMUNICATING);
    rgbled_set_all(CRGB::Blue); // Set LED to blue during NTP sync
  }
  else if (msg_str.startsWith("CMD_SENSING_"))
  {
    strncpy(cmd_sensing_raw, message, sizeof(cmd_sensing_raw) - 1);
    cmd_sensing_raw[sizeof(cmd_sensing_raw) - 1] = '\0';
    node_status.node_flags.sensing_requested = true;
    Serial.println("[COMMUNICATION] <CMD> CMD_SENSING received.");

    int y, mo, d, h, mi, s;
    int rate, dur;
    int matched = sscanf(message,
                         "CMD_SENSING_%d-%d-%d_%d:%d:%d_%dHz_%ds",
                         &y, &mo, &d, &h, &mi, &s, &rate, &dur);
    int ms_value = 0;
    if (matched == 8)
    {
      parsed_start_time.year = (uint16_t)y;
      parsed_start_time.month = (uint8_t)mo;
      parsed_start_time.day = (uint8_t)d;
      parsed_start_time.hour = (uint8_t)h;
      parsed_start_time.minute = (uint8_t)mi;
      parsed_start_time.second = (uint8_t)s;
      parsed_start_time.ms = ms_value;

      parsed_freq = (uint16_t)rate;
      parsed_duration = (uint16_t)dur;

      sensing_scheduled_start_ms = parsed_start_time.compute_ms_from_calendar();
      SensingSchedule.unix_ms = sensing_scheduled_start_ms;
      SensingSchedule.unix_epoch = sensing_scheduled_start_ms / 1000;
      SensingSchedule.set_calendar(); // Update calendar fields based on scheduled start time
      sensing_scheduled_end_ms = sensing_scheduled_start_ms + (parsed_duration * 1000); // ms

      sensing_rate_hz = parsed_freq;
      sensing_duration_s = parsed_duration;

      node_status.node_flags.sensing_scheduled = true;

      char buf[128];
      snprintf(buf, sizeof(buf), "[MQTT] Sensing scheduled, sampling at %d Hz for %d seconds, starting at %04d-%02d-%02d %02d:%02d:%02d",
               parsed_freq, parsed_duration,
               parsed_start_time.year, parsed_start_time.month, parsed_start_time.day,
               parsed_start_time.hour, parsed_start_time.minute, parsed_start_time.second);
      Serial.println(buf);
    }
    else
    {
      Serial.println("[MQTT] Failed to parse CMD_SENSING command.");
      node_status.node_flags.sensing_requested = false;
    }
  }
  else if (msg_str.startsWith("CMD_RETRIEVAL_"))
  {
    const char *filename_part = message + 14;
    snprintf(retrieval_filename, sizeof(retrieval_filename), "/%s.txt", filename_part);
    node_status.node_flags.data_retrieval_requested = true;
    node_status.node_flags.data_retrieval_sent = false; // Reset sent flag for new retrieval

    Serial.print("[COMMUNICATION] <CMD> CMD_RETRIEVAL received: ");
    Serial.println(retrieval_filename);

    // switch to COMMUNICATING state
    node_status.set_state(NodeState::COMMUNICATING);
    rgbled_set_all(CRGB::Blue); // Set LED to blue during data retrieval
  }
  else
  {
    Serial.println("[COMMUNICATION] <CMD> Unknown command.");
  }
}

// Connect to MQTT broker
void mqtt_setup()
{
  mqtt_client.setServer(MQTT_BROKER_ADDRESS, MQTT_BROKER_PORT);
  mqtt_client.setCallback(mqtt_callback);

  Serial.println("[INIT] <MQTT> Connecting to broker... ");
  while (!mqtt_client.connected())
  {
    if (mqtt_client.connect(MQTT_CLIENT_ID, MQTT_USERNAME, MQTT_PASSWORD))
    {
      Serial.println("[INIT] <MQTT> Connected.");
      mqtt_client.subscribe(MQTT_TOPIC_SUB);
      node_status.node_flags.mqtt_connected = true;
    }
    else
    {
      Serial.print("[INIT] <MQTT> Failed, Return Code = ");
      Serial.print(mqtt_client.state());
      Serial.println(" -> retrying in 2 sec...");
      delay(2000);
    }
  }
}

// Keep MQTT connection alive
void mqtt_loop()
{
  if (!mqtt_client.connected())
  {
    Serial.println("[COMMUNICATION] <MQTT> Reconnecting...");
    mqtt_setup();
  }
  mqtt_client.loop();
}

// Publish a test message
void mqtt_publish_test()
{
  if (mqtt_client.connected())
  {
    mqtt_client.publish(MQTT_TOPIC_PUB, "Hello from Arduino MQTT!");
    Serial.println("[TEST] <MQTT> Test message published.");
  }
}

!!!note 如上面代码所示,mqtt部分主要是有几个关键内容,包括初始化,循环,发布消息,和回调函数。其中回调部分我们在命令与反馈部分详细介绍

MQTT初始化

如函数mqtt_setup()所示,MQTT初始化主要是设置服务器地址和端口,设置回调函数,并尝试连接到MQTT服务器。如果连接成功,则订阅指定的主题。相关宏已经在config.hpp中定义。

MQTT循环

如函数mqtt_loop()所示,MQTT循环主要是检查连接状态,如果断开则重新连接,每次调用mqtt_loop()都会向MQTT服务器发送心跳包以保持连接活跃。

MQTT发布测试消息

如函数mqtt_publish_test()所示,基于Arduino函数库,在MQTT链接状态下,调用函数mqtt_client.publish()可以向指定主题发布消息。这里我们发布了一个简单的测试消息,该功能是数据上传的基础。注意,每次发送消息,其容量有限制,对于大体量数据,可能需要分段发送或者使用其他方式。

MQTT回调函数

如函数mqtt_callback()所示,当接收到订阅的主题消息时,会调用此回调函数。函数会解析消息内容,并根据不同的命令执行相应的操作。通常,如果对应操作比较耗时,我们会在回调函数中设置标志位或者设置状态机,以便在主循环中处理。在主循环中,根据对应状态和标志量来执行具体的操作。在这里,我们主要基于MQTT回调来实现远程控制。