硬汉嵌入式论坛

 找回密码
 立即注册
查看: 346|回复: 7
收起左侧

[ThreadX全家桶] threadx消息队列与事件组配合使用

[复制链接]

10

主题

36

回帖

66

积分

初级会员

积分
66
发表于 2025-7-17 20:18:06 | 显示全部楼层 |阅读模式
分享一个设计思路,今天遇到AT模块时弄的,场景:AT模块有AT指令的响应体,也有由模块自己的主动上报的消息体,在一个线程中完成对模块所有消息的收发(统一在一个地方收发可以好处是方便接口管理,不需要互斥);
驱动侧:
串口接收数据方式有中断,也可以是DMA + IDLE 或者 DMA + RTO方式,如果是后两者的话,可以在数据半完成中断和完成中断中,直接将数据扔到一个ringbuffer中(当成流处理)
线程侧:

收发均通过消息队列+事件组方式进行,线程通过接收,然后串口驱动侧,再发送数据时,标记收到的消息是否为AT指令的ACK,如果是,则不在事件组置flag,如果不是(表示串口侧主动收到的消息),则在事件组上置位,线程在处理该事件时,可以直接将数据读出来(在ringbuffer拿数据)进行处理或分发至其他组件,也可以通过发送消息队列,统一在消息队列中作处理。


浅聊消息队列:
消息队列是一个比较好用的组件,其好用程度看个人对其理解,在threadx中,消息队列发送可以发送一个指针的地址,即消息体不copy,效率其实是比较高的。所以可以将消息队列单独抽成一个模块,整个软件系统的消息分配和释放,以及各个线程监听的消息都可以在这里注册;同时整个软件系统的消息头都是相同的,唯一不同的就是消息体,消息体的内容可以根据消息事件ID进行区分。
线程将各自监听的消息队列注册到消息模块中,然后监听各自感兴趣的消息队列。发送者只需要标明该消息是发送到哪个模块的,然后交给消息模块进行分发(消息模块不需要独立线程,注册时只需要维护线程句柄和消息队列以及监听的消息ID即可),

当然这个方式不仅限于串口,通讯部分的大体都可以这么处理,当然还有很多细节没照顾到,比如处理消息的逻辑,一个事件尽量处理多消息,不同场景不同处理。
聊的可能逻辑性有点不好,但大体意思应该表达出来了,欢迎交流。
回复

使用道具 举报

1万

主题

7万

回帖

11万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
117512
QQ
发表于 2025-7-18 09:44:59 | 显示全部楼层
谢谢楼主分享。
回复

使用道具 举报

0

主题

6

回帖

6

积分

新手上路

积分
6
发表于 2025-7-18 14:12:52 | 显示全部楼层

谢谢楼主分享。
回复

使用道具 举报

16

主题

231

回帖

279

积分

高级会员

积分
279
发表于 2025-7-18 17:37:15 | 显示全部楼层
你的思路非常清晰,这种设计模式在RTOS中确实能提高消息传递的效率和可维护性。
回复

使用道具 举报

16

主题

231

回帖

279

积分

高级会员

积分
279
发表于 2025-7-18 18:00:41 | 显示全部楼层
本帖最后由 yunqi 于 2025-7-18 18:03 编辑

[C] 纯文本查看 复制代码
// 消息类型定义
typedef enum {
    MSG_SENSOR_DATA,     // 传感器数据
    MSG_BUTTON_EVENT,    // 按钮事件
    MSG_NETWORK_CMD      // 网络命令
} MsgEventID;

// 统一消息头
typedef struct {
    MsgEventID event_id;  // 事件ID
    uint32_t   timestamp; // 时间戳
    uint16_t   src_task;  // 发送方任务ID
} MsgHeader;

// 示例消息体1
typedef struct {
    MsgHeader header;     // 必须包含头
    float     temperature;
    float     humidity;
} SensorMsg;

// 示例消息体2
typedef struct {
    MsgHeader header;
    uint8_t  button_id;
    bool     is_pressed;
} ButtonMsg;

// msg_manager.c
#define MAX_QUEUES 5

static osMessageQueueId_t msg_queues[MAX_QUEUES];

// 初始化所有消息队列
void MsgManager_Init(void) {
    for (int i = 0; i < MAX_QUEUES; i++) {
        msg_queues[i] = osMessageQueueNew(10, sizeof(void*), NULL);
    }
}

// 发送消息(传递指针)
bool MsgManager_Send(MsgEventID id, void* msg, uint32_t timeout) {
    uint32_t queue_idx = id % MAX_QUEUES; // 简单哈希分配
    return osMessageQueuePut(msg_queues[queue_idx], &msg, 0, timeout) == osOK;
}

// 接收消息
bool MsgManager_Receive(osThreadId_t thread, void** msg, uint32_t timeout) {
    uint32_t queue_idx = osThreadGetId() % MAX_QUEUES; // 每个线程监听固定队列
    return osMessageQueueGet(msg_queues[queue_idx], msg, NULL, timeout) == osOK;
}


// mem_pool.c
#define POOL_SIZE 20

static osMemoryPoolId_t msg_pool;

void MemPool_Init(void) {
    msg_pool = osMemoryPoolNew(POOL_SIZE, sizeof(SensorMsg), NULL); // 以最大消息体为准
}

void* MsgAlloc(size_t size) {
    return osMemoryPoolAlloc(msg_pool, 0);
}

void MsgFree(void* msg) {
    osMemoryPoolFree(msg_pool, msg);
}


[C] 纯文本查看 复制代码
// 发送方线程
void SensorThread(void *arg) {
    SensorMsg* msg = MsgAlloc(sizeof(SensorMsg));
    msg->header.event_id = MSG_SENSOR_DATA;
    msg->temperature = 25.5f;
    
    MsgManager_Send(MSG_SENSOR_DATA, msg, osWaitForever);
}

// 接收方线程
void ProcessThread(void *arg) {
    void* received_msg;
    while (1) {
        if (MsgManager_Receive(osThreadGetId(), &received_msg, osWaitForever)) {
            MsgHeader* header = (MsgHeader*)received_msg;
            
            switch (header->event_id) {
                case MSG_SENSOR_DATA: {
                    SensorMsg* sensor_msg = (SensorMsg*)received_msg;
                    printf("Temp: %.1f\n", sensor_msg->temperature);
                    break;
                }
                // 其他消息处理...
            }
            
            MsgFree(received_msg);
        }
    }
}
回复

使用道具 举报

16

主题

231

回帖

279

积分

高级会员

积分
279
发表于 2025-7-18 18:01:12 | 显示全部楼层
关键设计优势:
零拷贝传递:只传递指针,避免消息体复制

统一消息头:所有消息类型可统一识别处理

模块化内存管理:

集中分配/释放消息内存

防止内存碎片(使用内存池)

负载均衡:多个队列通过哈希分配消息

类型安全:通过event_id进行消息类型判别

性能优化点:
可根据消息频率为不同事件ID分配独立队列

添加消息优先级支持(CMSIS-OS2的优先级参数)

实现消息统计功能(发送/接收计数)

添加超时后的自动内存回收机制

这种设计在资源受限的嵌入式系统中特别有效,我在多个工业级项目中采用类似方案,消息吞吐量提升可达3-5倍(相比传统数据拷贝方式)。
回复

使用道具 举报

10

主题

36

回帖

66

积分

初级会员

积分
66
 楼主| 发表于 2025-7-19 17:54:50 | 显示全部楼层
yunqi 发表于 2025-7-18 18:01
关键设计优势:
零拷贝传递:只传递指针,避免消息体复制

回复

使用道具 举报

10

主题

36

回帖

66

积分

初级会员

积分
66
 楼主| 发表于 2025-7-19 18:19:40 | 显示全部楼层
补充几点,
1、可以将消息队列做成消息事件,这样既可以处理带数据通信和不带数据的通信,
2、在资源紧张时,可以用数组data[0]来分配消息内容(有可能会导致内存碎片),好处是消息体长度根据实际的传递消息长度分配,另外通过第一点的方式,即使在中断中也可以使用事件来触发线程分配内存
3、如果消息需要有响应体或者返回值,在消息体或消息头上挂回调函数,可以在send时等处理的同步信号,在回调函数中释放同步信号来实现。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|小黑屋|Archiver|手机版|硬汉嵌入式论坛

GMT+8, 2025-8-12 02:46 , Processed in 0.043897 second(s), 24 queries .

Powered by Discuz! X3.4 Licensed

Copyright © 2001-2023, Tencent Cloud.

快速回复 返回顶部 返回列表