ThreadX——IPC應用之消息隊列
作者:zzssdd2
E-mail:[email protected]
一、應用簡介
消息隊列
是RTOS中常用的一種數據通訊方式,常用於任務與任務之間或是中斷與任務之間的數據傳遞。在裸機系統中我們通常會使用全局變數的方式進行數據傳遞,比如在事件發生後在中斷中改變數據和設置標誌,然後在主循環中輪詢不同的標誌是否生效來對全局數據執行不同的操作,執行完畢後清除相關標誌。但是這種方式需要不斷地輪詢標誌狀態,使得CPU的利用率並不高。而使用RTOS的消息隊列則具有任務阻塞機制,當沒有需要處理的消息時任務掛起等待消息,此時其他任務佔用CPU執行其他操作,當有消息放入隊列時任務恢復運行進行消息接收和處理。這種消息處理機制相比裸機而言大大地提高了CPU利用率。
- ThreadX的消息隊列支援「消息置頂通知」功能,也就是可以將消息放在隊列的最前面,使得任務可以及時處理某些緊急消息(RT-Thread的消息隊列也有該功能)
- ThreadX的消息隊列可以傳遞任意長度的數據,因為它是採用傳遞數據指針的方式(uCOS也是採用這種引用傳遞的方式,而FreeRTOS和RT-Thread則支援傳遞整體數據內容。這兩種方式各有優劣吧,指針傳遞方式優點是執行效率高,缺點是存數據的記憶體區域如果數據還未及時處理就被覆寫了那麼就會引發問題;整體數據傳遞方式優點是安全不需擔心數據覆寫致錯,缺點是數據量大的話傳遞數據過程執行時間長導致效率低)
二、API簡介
下面介紹使用ThreadX的消息隊列時常用的幾個API函數。
1、創建消息隊列
- 描述
- 該服務用於創建消息隊列。 消息總數是根據指定的消息大小和隊列中的位元組總數來計算的
- 如果在隊列的記憶體區域中指定的位元組總數不能被指定的消息大小均分,則不會使用該記憶體區域中的其餘位元組
- 參數
- queue_ptr 指向消息隊列控制塊的指針
- name_ptr 指向消息隊列名稱的指針
- message_size 指定隊列中每條消息的大小。 消息大小選項為1個32位字到16個32位字之間(包含)
- queue_start 消息隊列的起始地址。 起始地址必須與ULONG數據類型的大小對齊
- queue_size 消息隊列可用的位元組總數
- 返回值
- TX_SUCCESS (0x00) 創建成功
- TX_QUEUE_ERROR (0x09) 無效的消息隊列指針,指針為NULL或隊列已創建
- TX_PTR_ERROR (0x03) 消息隊列的起始地址無效
- TX_SIZE_ERROR (0x05) 消息隊列大小無效
- TX_CALLER_ERROR (0x13) 該服務的調用者無效
UINT tx_queue_create(
TX_QUEUE *queue_ptr,
CHAR *name_ptr,
UINT message_size,
VOID *queue_start,
ULONG queue_size);
2、刪除消息隊列
- 描述
- 此服務刪除指定的消息隊列。所有掛起等待此隊列消息的執行緒都將恢復,並給出TX_DELETED返回狀態
- 在刪除隊列之前,應用程式必須確保已完成(或禁用)此隊列的所有send_notify回調。 此外,應用程式必須防止將來使用已刪除的隊列
- 應用程式還負責管理與隊列相關聯的記憶體區域,該記憶體區域在此服務完成後可用
- 參數
- queue_ptr 指向先前創建的消息隊列的指針
- 返回值
- TX_SUCCESS (0x00) 刪除成功
- TX_QUEUE_ERROR (0x09) 消息隊列指針無效
- TX_CALLER_ERROR (0x13) 該服務的調用者無效
UINT tx_queue_delete(TX_QUEUE *queue_ptr);
3、清空消息隊列
- 描述
- 此服務刪除存儲在指定消息隊列中的所有消息
- 如果隊列已滿,將丟棄所有掛起執行緒的消息,然後恢復每個掛起的執行緒,並返回一個指示消息發送成功的返回狀態。如果隊列為空,則此服務不執行任何操作。
- 參數
- queue_ptr 指向先前創建的消息隊列的指針
- 返回值
- TX_SUCCESS (0x00) 操作成功
- TX_QUEUE_ERROR (0x09) 消息隊列指針無效
UINT tx_queue_flush(TX_QUEUE *queue_ptr);
4、消息置頂
- 描述
- 該服務將消息發送到指定消息隊列的最前面。 消息從源指針指定的存儲區域複製到隊列的最前面
- 參數
- queue_ptr 指向消息隊列控制塊的指針
- source_ptr 指向存放消息的指針
- wait_option 定義消息隊列已滿時服務的行為
- TX_NO_WAIT (0x00000000) – 無論是否成功都立即返回(用於非執行緒調用,例如中斷裡面)
- TX_WAIT_FOREVER (0xFFFFFFFF) – 一直等待直到消息隊列有空閑為止
- 返回值
- TX_SUCCESS (0x00) 操作成功
- TX_DELETED (0x01) 執行緒掛起時,消息隊列被刪除
- TX_QUEUE_FULL (0x0B) 服務無法發送消息,因為在指定的等待時間內隊列已滿
- TX_WAIT_ABORTED (0x1A) 被另一個執行緒、計時器或ISR中斷給中止
- TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
- TX_PTR_ERROR (0x03) 消息的源指針無效
- TX_WAIT_ERROR (0x04) 在非執行緒調用中指定了TX_NO_WAIT以外的等待選項
UINT tx_queue_front_send(
TX_QUEUE *queue_ptr,
VOID *source_ptr,
ULONG wait_option);
5、獲取消息隊列資訊
- 描述
- 該服務檢索有關指定消息隊列的資訊
- 參數(TX_NULL表示不需要獲取該參數代表的資訊)
- queue_ptr 指向先前創建的消息隊列的指針
- name 指向目標的指針,用於指向隊列名稱
- enqueued 指向目標的指針,表示當前隊列中的消息數
- available_storage 指向目標的指針,表示隊列當前有空間容納的消息數
- first_suspended 指向目標的指針,該指針指向該隊列的掛起列表中第一個執行緒
- suspended_count 指向目標的指針,用於指示當前在此隊列上掛起的執行緒數
- next_queue 指向下一個創建隊列的指針的目標的指針
- 返回值
- TX_SUCCESS (0x00) 操作成功
- TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
UINT tx_queue_info_get(
TX_QUEUE *queue_ptr,
CHAR **name,
ULONG *enqueued,
ULONG *available_storage
TX_THREAD **first_suspended,
ULONG *suspended_count,
TX_QUEUE **next_queue);
6、從隊列獲取消息
- 描述
- 該服務從指定的消息隊列中檢索消息。 檢索到的消息從隊列複製到目標指針指定的存儲區域。 然後將該消息從隊列中刪除
- 指定的目標存儲區必須足夠大以容納消息。 也就是說,由destination_ptr 指向的消息目標必須至少與此隊列的消息大小一樣大。 否則,如果目標不夠大,則會在存儲區域中發生記憶體地址非法錯誤
- 參數
- queue_ptr 指向先前創建的消息隊列的指針
- destination_ptr 指向儲存消息的地址
- wait_option 定義消息隊列為空時服務的行為
- TX_NO_WAIT (0x00000000) – 無論是否成功都立即返回(用於非執行緒調用,例如中斷裡面)
- TX_WAIT_FOREVER (0xFFFFFFFF) – 一直等待直到有消息可以獲取
- 0x00000001 ~ 0xFFFFFFFE– 指定具體等待心跳節拍數(如果心跳頻率1KHZ,那麼單位就是ms )
- 返回值
- TX_SUCCESS (0x00) 操作成功
- TX_DELETED (0x01) 執行緒掛起時刪除了消息隊列
- TX_QUEUE_EMPTY (0x0A) 服務無法檢索消息,因為隊列在指定的等待時間段內為空
- TX_WAIT_ABORTED (0x1A) 被另一個執行緒、計時器或ISR中斷給中止
- TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
- TX_PTR_ERROR (0x03) 消息的目標指針無效
- TX_WAIT_ERROR (0x04) 在非執行緒調用中指定了TX_NO_WAIT以外的等待選項
UINT tx_queue_receive(
TX_QUEUE *queue_ptr,
VOID *destination_ptr,
ULONG wait_option);
7、向隊列發送消息
- 描述
- 此服務將消息發送到指定的消息隊列。發送的消息將從源指針指定的記憶體區域複製到隊列中。
- 參數
- queue_ptr 指向先前創建的消息隊列的指針
- source_ptr 指向消息的指針
- wait_option 定義消息隊列已滿時服務的行為
- TX_NO_WAIT (0x00000000) – 無論是否成功都立即返回(用於非執行緒調用,例如中斷裡面)
- TX_WAIT_FOREVER (0xFFFFFFFF) – 一直等待直到隊列有空位可以放置消息
- 0x00000001 ~ 0xFFFFFFFE – 指定具體等待心跳節拍數(如果心跳頻率1KHZ,那麼單位就是ms )
- 返回值
- TX_SUCCESS (0x00) 操作成功
- TX_DELETED (0x01) 執行緒掛起時刪除了消息隊列
- TX_QUEUE_FULL (0x0B) 服務無法發送消息,因為隊列在指定的等待時間內已滿
- TX_WAIT_ABORTED (0x1A) 被另一個執行緒、計時器或ISR中斷給中止
- TX_QUEUE_ERROR (0x09) 無效的消息隊列指針
- TX_PTR_ERROR (0x03) 消息的目標指針無效
- TX_WAIT_ERROR (0x04) 在非執行緒調用中指定了TX_NO_WAIT以外的等待選項
UINT tx_queue_send(
TX_QUEUE *queue_ptr,
VOID *source_ptr,
ULONG wait_option);
8、註冊發送通知回調函數
- 描述
- 此服務註冊一個通知回調函數,每當一條消息發送到指定的隊列時就會調用該函數。 通知回調的處理由應用程式定義
- 不允許在應用程式的隊列發送通知回調函數中調用具有暫停選項的ThreadX API
- 參數
- queue_ptr 指向先前創建的隊列的指針
- queue_send_notify 指嚮應用程式隊列發送通知功能的指針。 如果此值為TX_NULL,則禁用通知
- 返回值
- TX_SUCCESS (0x00) 操作成功
- TX_QUEUE_ERROR (0x09) 無效的隊列指針
- TX_FEATURE_NOT_ENABLED (0xFF) 禁用了通知功能
UINT tx_queue_send_notify(
TX_QUEUE *queue_ptr,
VOID (*queue_send_notify)(TX_QUEUE *));
三、實例演示
- 該應用實例創建三個任務和一個隊列消息發送通知回調
- 任務1:按鍵1按一次向消息隊列1發送一條消息(單個變數消息)
- 任務2:按鍵2按一次向消息隊列2發送一條消息(結構體指針消息)
- 任務3:向消息隊列3發送消息;接收任務1和任務2的消息並列印輸出消息內容
- 回調功能:輸出消息隊列3的相關資訊
創建消息隊列
#define DEMO_STACK_SIZE (2 * 1024)
#define DEMO_BYTE_POOL_SIZE (32 * 1024)
TX_THREAD thread_0;
TX_THREAD thread_1;
TX_THREAD thread_2;
TX_BYTE_POOL byte_pool_0;
UCHAR memory_area[DEMO_BYTE_POOL_SIZE];
/* 消息隊列 */
TX_QUEUE tx_queue1;
TX_QUEUE tx_queue2;
TX_QUEUE tx_queue3;
ULONG msg_queue1[32];
ULONG msg_queue2[16];
ULONG msg_queue3[8];
struct S_DATA{
uint32_t id;
uint16_t flag;
uint8_t msg[2];
};
struct S_DATA data_package;
void thread_0_entry(ULONG thread_input);
void thread_1_entry(ULONG thread_input);
void thread_2_entry(ULONG thread_input);
void queue3_send_notify(TX_QUEUE *input);
void tx_application_define(void *first_unused_memory)
{
CHAR *pointer = TX_NULL;
/* Create a byte memory pool from which to allocate the thread stacks. */
tx_byte_pool_create(&byte_pool_0, "byte pool 0", memory_area, DEMO_BYTE_POOL_SIZE);
/* Allocate the stack for thread 0. */
tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
/* Create the main thread. */
tx_thread_create(&thread_0, "thread 0", thread_0_entry, 0,
pointer, DEMO_STACK_SIZE,
1, 1, TX_NO_TIME_SLICE, TX_AUTO_START);
/* Allocate the stack for thread 1. */
tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
/* Create threads 1 */
tx_thread_create(&thread_1, "thread 1", thread_1_entry, 0,
pointer, DEMO_STACK_SIZE,
2, 2, TX_NO_TIME_SLICE, TX_AUTO_START);
/* Allocate the stack for thread 2. */
tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
/* Create threads 1 */
tx_thread_create(&thread_2, "thread 2", thread_2_entry, 0,
pointer, DEMO_STACK_SIZE,
3, 3, TX_NO_TIME_SLICE, TX_AUTO_START);
/* 創建消息隊列 */
tx_queue_create(&tx_queue1, "tx_queue1", 1, msg_queue1, sizeof(msg_queue1));
tx_queue_create(&tx_queue2, "tx_queue2", 1, msg_queue2, sizeof(msg_queue2));
tx_queue_create(&tx_queue3, "tx_queue2", 1, msg_queue3, sizeof(msg_queue3));
/* 註冊發送消息回調 */
tx_queue_send_notify(&tx_queue3, queue3_send_notify);
}
任務1
void thread_0_entry(ULONG thread_input)
{
uint8_t i =0, key_flag = 0;
uint8_t data_single = 0;
while(1)
{
if (0 == key_flag)
{
if (GPIO_PIN_SET == HAL_GPIO_ReadPin(KEY1_GPIO_Port, KEY1_Pin))
{
key_flag = 1;
}
}
else
{
if (GPIO_PIN_RESET == HAL_GPIO_ReadPin(KEY1_GPIO_Port,KEY1_Pin))
{
key_flag = 0;
/*按鍵1觸發,向隊列1發送消息*/
data_single++;
tx_queue_send(&tx_queue1, &data_single, TX_NO_WAIT);
}
}
tx_thread_sleep(20);
}
}
任務2
void thread_1_entry(ULONG thread_input)
{
uint8_t key_flag = 0;
struct S_DATA *pData;
pData = &data_package;
pData->id = 1;
pData->flag = 2;
pData->msg[0] = 3;
pData->msg[1] = 4;
while(1)
{
if (0 == key_flag)
{
if (GPIO_PIN_SET == HAL_GPIO_ReadPin(KEY2_GPIO_Port, KEY2_Pin))
{
key_flag = 1;
}
}
else
{
if (GPIO_PIN_RESET == HAL_GPIO_ReadPin(KEY2_GPIO_Port,KEY2_Pin))
{
key_flag = 0;
/*按鍵2觸發,向隊列2發送消息*/
pData->id += 8;
pData->flag += 4;
pData->msg[0] += 2;
pData->msg[1] += 1;
tx_queue_send(&tx_queue2, &pData, TX_NO_WAIT);
}
}
tx_thread_sleep(20);
}
}
任務3
void thread_2_entry(ULONG thread_input)
{
UINT status;
uint8_t char_data;
ULONG long_data = 0;
struct S_DATA *buf_data;
while(1)
{
/* 向隊列3發送消息 */
long_data++;
tx_queue_send(&tx_queue3, &long_data, TX_NO_WAIT);
if (0 == (long_data & 7))
{
tx_queue_flush(&tx_queue3);
}
/* 接收隊列1消息 */
status = tx_queue_receive(&tx_queue1, &char_data, 1000);
if (TX_SUCCESS == status)
{
SEGGER_RTT_SetTerminal(0);
SEGGER_RTT_printf(0, RTT_CTRL_TEXT_BRIGHT_GREEN"message queue1 receive data is %d\r\n", char_data);
}
/* 接收隊列2消息 */
status = tx_queue_receive(&tx_queue2, &buf_data, 1000);
if (TX_SUCCESS == status)
{
SEGGER_RTT_SetTerminal(1);
SEGGER_RTT_printf(0, RTT_CTRL_TEXT_BRIGHT_YELLOW"message queue2 receive data is %d\t%d\t%d\t%d \r\n", \
buf_data->id, \
buf_data->flag, \
buf_data->msg[0], \
buf_data->msg[1]);
}
}
}
發送隊列消息回調功能
void queue3_send_notify(TX_QUEUE *input)
{
ULONG enqueued; // 隊列中的消息數
ULONG available_storage; // 隊列剩餘空間
tx_queue_info_get(&tx_queue3, TX_NULL, &enqueued, &available_storage, TX_NULL, TX_NULL, TX_NULL);
SEGGER_RTT_SetTerminal(2);
SEGGER_RTT_printf(0, "the number of messages in the queue3 %d\r\n", enqueued);
SEGGER_RTT_printf(0, "the queue3 remaining size %d\r\n", available_storage);
}
任務1演示結果
任務2演示結果
任務3演示結果
註:關於使用SEGGER_RTT列印功能可以參考這篇筆記://www.cnblogs.com/zzssdd2/p/14162382.html