跳转至

TinyOrch 源码解析

内部查找函数

static int find_flow(const tiny_orch_t *orch, const char *name)

遍历 flows[] 表,匹配 name。所有公开 API 的 flow 查找都通过此函数,避免重复代码。

流程定义

create — 创建流程

检查重名 → 找空闲槽 → 初始化状态为 DEFINED。步骤表在后续 step() 中填充。

step — 添加步骤

tiny_orch_err_t tiny_orch_step(tiny_orch_t *orch, ...)

关键校验链

1. 查 flow 是否存在
2. step_index 是否越界
3. bench_get(src_name) → 存在?类型是什么?
4. bench_get_tool_fn(tool_name) → 注册了吗?输入类型是什么?
5. src_type == tool_in_type ? → 通过
6. 记录 step {src_name, tool_name, dst_name}
7. 更新 step_count = max(step_count, step_index + 1)

步骤索引

index 不必连续。step(flow, 0, ...) + step(flow, 2, ...) 会产生两个步骤,index 1 自动跳过。

执行引擎

execute_step — 执行单步

static tiny_orch_err_t execute_step(tiny_orch_t *orch,
                                     const tiny_orch_step_t *step)

流程

1. bench_get(step.src)           → 获源数据指针 + 类型
2. bench_get_tool_fn(step.tool)  → 获函数指针 + 输入/输出类型
3. fn(src_data, dst_data)        → 调用工具
4. bench_put(step.dst, dst_data, dst_len, fn_out_type)
                                 → 更新目标数据槽的类型

工具函数的 dst_data 来自 bench_get(step.dst)——目标槽必须在执行前已 put 预留。

go — 执行流程

tiny_orch_err_t tiny_orch_go(tiny_orch_t *orch, const char *flow_name)

遍历 steps[],跳过未使用的 index,依次执行 execute_step。每成功一步更新 last_executed。失败时设置状态为 ERROR 并返回具体错误码。

run — 临时单步

构建临时 tiny_orch_step_t,直接调用 execute_step。不修改任何 flow 的状态。

MQTT 命令解析

MQTT 回调中,ORCH,* 命令被解析为对应的 TinyOrch API 调用:

MQTT 命令 C API 调用
ORCH,CREATE,name tiny_orch_create(orch, name)
ORCH,STEP,name,idx,src,tool,dst tiny_orch_step(orch, name, idx, src, tool, dst)
ORCH,GO,name tiny_orch_go(orch, name)
ORCH,RUN,src,tool,dst tiny_orch_run(orch, src, tool, dst)
ORCH,STATUS,name tiny_orch_status(orch, name, &state)
ORCH,LIST tiny_orch_list_flows(orch, names, max)
ORCH,DESTROY,name tiny_orch_destroy(orch, name)

命令先拷贝到 null-terminated 本地缓冲区再解析,避免 MQTT data 非 null-terminated 导致的 sscanf 问题。