【freertos】010-消息隊列概念及其實現細節
- 2022 年 6 月 5 日
- 筆記
- /label/FreeRTOS, /label/lzm, C語言, freeRTOS, IPC, 任務間通訊, 教程集合
前言
消息隊列是任務間通訊系列介紹的首篇筆記,因為學習完消息隊列的源碼實現後,訊號量、互斥量這些任務間通訊機制也相當於學完了,只剩下概念性的內容了。
參考:
10.1 消息隊列概念
消息隊列實任務間通訊機制中的一種。
其它還有二值訊號量、計數訊號量、互斥量和遞歸互斥量等等。
一個或多個任務往一個消息容器裡面發消息,其它一個或多個任務從這個消息容器裡面獲取消息,這樣實現通訊。
/* 該圖源自野火 */
freertos的消息隊列:
- 支援FIFO、支援LIFO也支援非同步讀寫工作方式。
- 支援超時機制。
- 支援不同長度(在節點長度範圍內)、任意類型的消息。
- 一個任務可對一個消息隊列讀、寫。
- 一個消息隊列支援被多個任務讀、寫。
- 隊列使用一次後自動從消息隊列中移除。
10.2 消息隊列的數據傳輸機制
隊列傳輸數據有兩種方式:
- 拷貝:把數據、把變數的值複製進隊列里。
- 引用:把數據、把變數的地址複製進隊列里。
而freertos的消息隊列機制就是拷貝,拷貝的方式有以下優點:
- 局部變數的值可以發送到隊列中,後續即使函數退出、局部變數被回收,也不會影響隊列中的數據。
- 無需分配buffer來保存數據,隊列中有buffer。
- 發送任務、接收任務解耦:接收任務不需要知道這數據是誰的、也不需要發送任務來釋放數據。
- 如果數據實在太大,可以選擇傳輸地址(即是拷貝地址),依然能實現傳輸引用的效果。
- 隊列的空間有FreeRTOS內核分配,無需上層應用維護。
- 無需考慮記憶體保護功能,因為拷貝的方式新數據的存儲區是由隊列組件提供的,無需擔心獲取消息的任務需要許可權訪問。
當然對比引用的方式也有劣勢:
- 拷貝數據相對拷貝引用來說要耗時。
- 需要更多記憶體,因為需要存儲數據副本。
10.3 消息隊列的阻塞訪問機制
只要拿到隊列句柄,任務和中斷都有許可權訪問消息隊列,但是也有阻塞限制。
寫消息時,如果消息隊列已滿,則無法寫入(覆蓋寫入除外),如果用戶設置的阻塞時間不為0,則任務會進入阻塞,直到該隊列有空閑空間給當前任務寫入消息或阻塞時間超時才解除阻塞。
上面說的「該隊列有空閑空間給當前任務寫入消息」是因為就算當前隊列有空間空間,也會優先安排阻塞在等待寫鏈表中的最高優先順序任務先寫入。如果任務優先順序相同,則先安排給最早開始等待的那個任務先寫。
讀消息時,機制和寫消息一樣,只是阻塞的條件是隊列裡面沒有消息。
數據傳輸和阻塞訪問機制都會在分析源碼時闡述。
10.4 消息隊列使用場景
消息隊列可以應用於發送不定長消息的場合,包括任務與任務間的消息交換。
隊列是FreeRTOS主要的任務間通訊方式,可以在任務與任務間、中斷和任務間傳送資訊。
發送到隊列的消息是通過拷貝方式實現的,這意味著隊列存儲的數據是原數據,而不是原數據的引用。
10.5 消息隊列控制塊
消息隊列的控制塊也是隊列控制塊,這個控制塊的數據結構除了被消息隊列使用,還被使用到二值訊號量、計數訊號量、互斥量和遞歸互斥量。
FreeRTOS的消息隊列控制塊由多個元素組成,當消息隊列被創建時,系統會為控制塊分配對應的記憶體空間,用於保存消息隊列的一些資訊,包括數據區位置、隊列狀態等等。
10.5.1 隊列控制塊源碼
隊列控制塊struct QueueDefinition
源碼:
/*
* Definition of the queue used by the scheduler.
* Items are queued by copy, not reference. See the following link for the
* rationale: //www.freertos.org/Embedded-RTOS-Queues.html
*/
typedef struct QueueDefinition
{
int8_t *pcHead; /*< Points to the beginning of the queue storage area. */
int8_t *pcTail; /*< Points to the byte at the end of the queue storage area. Once more byte is allocated than necessary to store the queue items, this is used as a marker. */
int8_t *pcWriteTo; /*< Points to the free next place in the storage area. */
union /* Use of a union is an exception to the coding standard to ensure two mutually exclusive structure members don't appear simultaneously (wasting RAM). */
{
int8_t *pcReadFrom; /*< Points to the last place that a queued item was read from when the structure is used as a queue. */
UBaseType_t uxRecursiveCallCount;/*< Maintains a count of the number of times a recursive mutex has been recursively 'taken' when the structure is used as a mutex. */
} u;
List_t xTasksWaitingToSend; /*< List of tasks that are blocked waiting to post onto this queue. Stored in priority order. */
List_t xTasksWaitingToReceive; /*< List of tasks that are blocked waiting to read from this queue. Stored in priority order. */
volatile UBaseType_t uxMessagesWaiting;/*< The number of items currently in the queue. */
UBaseType_t uxLength; /*< The length of the queue defined as the number of items it will hold, not the number of bytes. */
UBaseType_t uxItemSize; /*< The size of each items that the queue will hold. */
volatile int8_t cRxLock; /*< Stores the number of items received from the queue (removed from the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */
volatile int8_t cTxLock; /*< Stores the number of items transmitted to the queue (added to the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */
#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
uint8_t ucStaticallyAllocated; /*< Set to pdTRUE if the memory used by the queue was statically allocated to ensure no attempt is made to free the memory. */
#endif
#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition *pxQueueSetContainer;
#endif
#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber;
uint8_t ucQueueType;
#endif
} xQUEUE;
10.5.2 隊列控制塊成員剖析
在成員剖析時默認按消息隊列的作用去剖析。
int8_t *pcHead;
:
/* 該隊列存儲區的起始位置,對應第一個消息空間。*/
int8_t *pcHead;
int8_t *pcTail;
:
/* 消息隊列存儲區的結尾位置。
* 結合 pcHead 指針就是整個存儲區合法區域。*/
int8_t *pcHead;
int8_t *pcWriteTo;
:
/* 寫指針,指向存儲區中下一個空閑的空間。
* 隊列下次寫數據的位置,需要入隊時調用該指針寫入數據。*/
int8_t *pcWriteTo;
int8_t *pcReadFrom;
:
/* 讀指針,指向存儲區中下一個有效數據的空間。
* 隊列下次讀取數據的位置,需要出隊時調用該指針寫入數據。*/
int8_t *pcReadFrom;
UBaseType_t uxRecursiveCallCount;
:
/* 遞歸次數。
* 用於互斥量時使用,與 pcReadFrom 為聯合體。
* 記錄遞歸互斥量被調用的次數。 */
UBaseType_t uxRecursiveCallCount;
List_t xTasksWaitingToSend;
:
/* 等待發送的任務列表。
* 當隊列存儲區滿時,需要發送消息的任務阻塞時記錄到該鏈表。
* 按任務優先順序排序。 */
List_t xTasksWaitingToSend;
List_t xTasksWaitingToReceive;
:
/* 等待接收的任務列表。
* 當隊列存儲區為空時,需要獲取消息的任務阻塞時記錄到該鏈表。
* 按任務優先順序排序。 */
List_t xTasksWaitingToReceive;
volatile UBaseType_t uxMessagesWaiting;
:
/* 當前消息節點的個數。
* 即是當前有效消息數量。
* 二值訊號量、互斥訊號量時:表示有無訊號量可用。
* 計數訊號量時:有效訊號量個數。 */
volatile UBaseType_t uxMessagesWaiting;
UBaseType_t uxLength;
:
/* 當前隊列最大節點總數。
* 即是最多能存放多少個消息。
* 二值訊號量、互斥訊號量時:最大為1。
* 計數訊號量時:最大的訊號量個數。 */
UBaseType_t uxLength;
UBaseType_t uxItemSize;
:
/* 單個節點的大小。
* 單個消息的大小。
* 二值訊號量、互斥訊號量時:0。
* 計數訊號量時:0。 */
UBaseType_t uxItemSize;
volatile int8_t cRxLock;
:
/* 記錄出隊的數據項個數。
* 即是需要解除多少個阻塞在接收等待列表中的任務。 */
volatile int8_t cRxLock;
volatile int8_t cTxLock;
:
/* 記錄入隊的數據項個數。
* 即是需要解除多少個阻塞在發送等待列表中的任務。 */
volatile int8_t cTxLock;
10.5.3 cRxLock 和 cTxLock
當中斷服務程式操作隊列並且導致阻塞的任務解除阻塞時。
首先判斷該隊列是否上鎖:
- 如果沒有上鎖,則解除被阻塞的任務,還會根據需要設置上下文切換請求標誌;
- 如果隊列已經上鎖,則不會解除被阻塞的任務,取而代之的是,將xRxLock或xTxLock加1,表示隊列上鎖期間出隊或入隊的數目,也表示有任務可以解除阻塞了。
cRxLock
對應待出隊的個數。
cTxLock
對應待入隊的個數。
10.5.4 隊列控制塊數據結構圖
10.6 創建消息隊列
創建消息隊列是在系統上新建一個消息隊列,申請資源並初始化後返回句柄給用戶,用戶可以使用該隊列句柄訪問、操作該隊列。
10.6.1 創建消息隊列API說明
隊列的創建有兩種方法:靜態分配記憶體、動態分配記憶體。其區別就是隊列的記憶體來源是用戶提供的還是內核分配的。
主要分析動態分配記憶體。
函數原型:
QueueHandle_t xQueueCreate( UBaseType_t uxQueueLength, UBaseType_t uxItemSize );
參數說明:
-
uxQueueLength
:隊列長度,最多能存放多少個數據(item)。 -
uxItemSize
:每個數據(item)的大小:以位元組為單位。 -
返回值:
- 非0:成功,返回句柄,以後使用句柄來操作隊列。
- NULL:失敗,因為記憶體不足。
10.6.2 創建消息隊列簡要步驟
- 參數校驗。
- 計算本次隊列需要的總記憶體。
- 分配隊列記憶體空間。
- 初始化隊列控制塊。
- 格式化隊列數據區。
- 返回隊列句柄。
10.6.3 創建消息隊列源碼
創建消息隊列這個API其實就是封裝了創建隊列xQueueGenericCreate()
這個通用API,類型為queueQUEUE_TYPE_BASE
。
#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
#define xQueueCreate( uxQueueLength, uxItemSize ) xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
#endif
其中隊列的類型有多種:
/* For internal use only. These definitions *must* match those in queue.c. */
#define queueQUEUE_TYPE_BASE ( ( uint8_t ) 0U ) // 隊列類型
#define queueQUEUE_TYPE_SET ( ( uint8_t ) 0U ) // 隊列集合類型
#define queueQUEUE_TYPE_MUTEX ( ( uint8_t ) 1U ) // 互斥量類型
#define queueQUEUE_TYPE_COUNTING_SEMAPHORE ( ( uint8_t ) 2U ) // 計數訊號量類型
#define queueQUEUE_TYPE_BINARY_SEMAPHORE ( ( uint8_t ) 3U ) // 二進位訊號量類型
#define queueQUEUE_TYPE_RECURSIVE_MUTEX ( ( uint8_t ) 4U ) // 遞歸互斥量類型
創建隊列函數源碼xQueueGenericCreateStatic()
:
#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
const uint8_t ucQueueType )
{
Queue_t * pxNewQueue = NULL;
size_t xQueueSizeInBytes;
uint8_t * pucQueueStorage;
if( ( uxQueueLength > ( UBaseType_t ) 0 ) &&
/* 檢查需要的數據區size是否溢出限定範圍 */
( ( SIZE_MAX / uxQueueLength ) >= uxItemSize ) &&
/* 檢查本次隊列創建需要的空間是否溢出限定範圍 */
( ( SIZE_MAX - sizeof( Queue_t ) ) >= ( uxQueueLength * uxItemSize ) ) )
{
/* 計算數據區空間。
如果隊列創建的是不帶數據的,如訊號量、互斥量,則傳入參數時uxItemSize值應該被置為0。 */
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
/* 一次性分配隊列所需要的空間,包括隊列控制塊和數據區 */
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );
if( pxNewQueue != NULL )
{
/* 找出數據區起始地址 */
pucQueueStorage = ( uint8_t * ) pxNewQueue;
pucQueueStorage += sizeof( Queue_t );
#if ( configSUPPORT_STATIC_ALLOCATION == 1 )
{
/* 如果系統使能了靜態創建功能,就需要標記當前隊列是動態創建,記憶體有內核管理,以防用戶刪除。 */
pxNewQueue->ucStaticallyAllocated = pdFALSE;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */
/* 初始化這個隊列 */
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
else
{
traceQUEUE_CREATE_FAILED( ucQueueType );
mtCOVERAGE_TEST_MARKER();
}
}
else
{
configASSERT( pxNewQueue );
mtCOVERAGE_TEST_MARKER();
}
/* 返回隊列起始地址,便是隊列句柄 */
return pxNewQueue;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */
初始化隊列函數源碼prvInitialiseNewQueue()
:
- 小筆記:初始化隊列,看源碼實現就知道控制塊和數據區物理記憶體是可以分開的,但是在創建消息隊列這個API裡面實現是連續的。
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t * pucQueueStorage,
const uint8_t ucQueueType,
Queue_t * pxNewQueue )
{
/* 防止編譯時警告未使用 */
( void ) ucQueueType;
if( uxItemSize == ( UBaseType_t ) 0 )
{
/* 如果沒有數據區(如訊號量、互斥量等等),就需要把隊列中的pcHead指回當前隊列控制塊起始地址,表明當前隊列不含數據區。 */
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else
{
/* 如果當前隊列含有數據區,則把 */
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}
/* 保存當前隊列成員數量 */
pxNewQueue->uxLength = uxQueueLength;
/* 保存當前隊列每個成員的最大size */
pxNewQueue->uxItemSize = uxItemSize;
/* 隊列格式化。(組成一個介面是因為不僅僅在這裡用到重置隊列的功能) */
( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
#if ( configUSE_TRACE_FACILITY == 1 )
{
/* 記錄當前隊列類型。一般用於調試、查棧使用。 */
pxNewQueue->ucQueueType = ucQueueType;
}
#endif /* configUSE_TRACE_FACILITY */
#if ( configUSE_QUEUE_SETS == 1 )
{
pxNewQueue->pxQueueSetContainer = NULL;
}
#endif /* configUSE_QUEUE_SETS */
traceQUEUE_CREATE( pxNewQueue );
}
重置隊列函數xQueueGenericReset()
:
- 專門用於函數據區的隊列,如消息隊列。
BaseType_t xQueueGenericReset( QueueHandle_t xQueue,
BaseType_t xNewQueue )
{
BaseType_t xReturn = pdPASS;
Queue_t * const pxQueue = xQueue;
/* 參數校驗 */
configASSERT( pxQueue );
if( ( pxQueue != NULL ) &&
( pxQueue->uxLength >= 1U ) && /* 隊列成員數不能小於1,要不然算參數校驗失敗 */
( ( SIZE_MAX / pxQueue->uxLength ) >= pxQueue->uxItemSize ) ) /* 隊列size溢出檢查 */
{
taskENTER_CRITICAL(); /* 進入任務臨界 */
{
/* 保存整個隊列尾部的地址,和pxQueue->pcHead結合看,就是這個隊列的合法空間首尾 */
pxQueue->u.xQueue.pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
/* 重置當前有效消息數量 */
pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
/* 重置寫指針,指向第一個隊列成員 */
pxQueue->pcWriteTo = pxQueue->pcHead;
/* 重置讀指針,指向最後一個隊列成員。因為下次讀前要先偏移讀指針。 */
pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - 1U ) * pxQueue->uxItemSize );
/* 重置消息隊列讀鎖:開鎖狀態 */
pxQueue->cRxLock = queueUNLOCKED;
/* 重置消息隊列寫鎖:開鎖狀態 */
pxQueue->cTxLock = queueUNLOCKED;
if( xNewQueue == pdFALSE ) /* 重置已經在使用的隊列 */
{
/* 因為重置隊列相當於清空隊列裡面的數據,隊列有空位可寫入,所以可以解除一個寫阻塞任務 */
/* 如果有任務因為當前隊列而寫阻塞的,可以解除 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
/* 解除一個寫阻塞任務 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
/* 如果內部解鎖了個比當前優先順序還高的任務,就觸發一次任務切換。(當然,實際自信還是在退出任務臨界才會執行) */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else /* 重置的是一個新的隊列 */
{
/* 直接初始化寫阻塞任務鏈表和讀阻塞任務鏈表. */
vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
}
}
taskEXIT_CRITICAL(); /* 退出任務臨界 */
}
else
{
xReturn = pdFAIL;
}
/* 前面if裡面參數校驗失敗會直接斷言 */
configASSERT( xReturn != pdFAIL );
return xReturn;
}
10.6.4 消息隊列數據結構圖
10.7 發送消息
任務或者中斷服務程式都可以給消息隊列發送消息。
中斷中發送消息不可阻塞。要麼直接返回,要麼覆蓋寫入。
任務發送消息時,如果隊列未滿或者允許覆蓋入隊,FreeRTOS會將消息拷貝到消息隊列隊尾或隊列頭,否則,會根據用戶指定的阻塞超時時間進行阻塞。直到該隊列有空閑空間給當前任務寫入消息或阻塞時間超時才解除阻塞。
發送消息的API分任務和中斷專屬,中斷專用的API都帶FromISR
後綴。
因為本系列筆記主要記錄源碼實現,API的使用不會詳細列舉。
10.7.1 發送消息API
/* 往隊列尾部寫入數據。等同於xQueueSendToBack */
BaseType_t xQueueSend(QueueHandle_t xQueue, const void *pvItemToQueue, TickType_t xTicksToWait);
/* 往隊列尾部寫入數據。等同於xQueueSend */
BaseType_t xQueueSendToBack(QueueHandle_t xQueue, const void *pvItemToQueue, TickType_t xTicksToWait);
/* 往隊列尾部寫入數據。中斷專用 */
BaseType_t xQueueSendToBackFromISR(QueueHandle_t xQueue, const void *pvItemToQueue, BaseType_t *pxHigherPriorityTaskWoken);
/* 往隊列頭部寫入數據 */
BaseType_t xQueueSendToFront(QueueHandle_t xQueue, const void *pvItemToQueue, TickType_t xTicksToWait);
/* 往隊列頭部寫入數據。中斷專用 */
BaseType_t xQueueSendToFrontFromISR(QueueHandle_t xQueue, const void *pvItemToQueue, BaseType_t *pxHigherPriorityTaskWoken);
參數說明:
-
xQueue
:隊列句柄。 -
pvItemToQueue
:數據指針,這些數據的值會被複制進隊列。 -
xTicksToWait
:最大阻塞時間,單位Tick Count。- 如果被設為0,無法寫入數據時函數會立刻返回;
- 如果被設為
portMAX_DELAY
,則會一直阻塞直到有空間可寫
-
返回值:
pdPASS
:數據成功寫入了隊列errQUEUE_FULL
:寫入失敗,因為隊列滿了。
10.7.2 發送消息實現簡要步驟
-
參數校驗。
-
檢查當前隊列是否有空閑空間可寫入。
-
進入臨界。
-
有空間可寫入:
- 直接寫入。
- 檢查下是否有任務阻塞在當前隊列寫阻塞鏈表中,有就解鎖一個最高優先順序、最早開始等待的任務。
- 退出臨界。
-
沒空間可寫入:進入阻塞處理。
- 不需要阻塞,就退出臨界並返回。
- 開始阻塞超時計時。
- 退出臨界。(可能會切到其它任務或中斷)
- 掛起調度器。
- 再次檢查下是否有空間可寫,是否超時。
- 需要阻塞就計算下當前任務的喚醒時間,記錄到任務事件狀態節點資訊中,把當前任務從就緒鏈表抽離,插入到延時鏈表和當前隊列的寫阻塞任務鏈表中。
- 切走任務,等待喚醒。
-
10.7.3 發送消息源碼分析
往隊列里發消息的API(中斷專用除外),都是封裝xQueueGenericSend()
函數而來的,所以我們直接分析該函數實現即可。
需要注意的是,如果發送消息前,調度器被掛起了,則這個消息不能配置為阻塞式的,因為如果掛起調度器後使用阻塞式寫入隊列,會觸發斷言。
在這裡可以拓展下,如果沒有這個斷言校驗,隊列已滿,則會在當前任務一直死循環,直至有中斷服務恢復調度器或讀取當前隊列的消息,當前任務才能跑出這個坑。
xQueueGenericSend()
:
BaseType_t xQueueGenericSend( QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition )
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = xQueue;
/* 傳入的隊列句柄不能為空 */
configASSERT( pxQueue );
/* 如果寫入隊列的數據為空,就說明調用當前API的不是一個消息隊列,而是不含數據區的訊號量、互斥量這些IPC,所以隊列成員size必須為0 */
configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
/* 如果是覆蓋寫入,這個功能默認只能在隊列成員只有1個的情況下使用 */
configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) );
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
/* 如果調度器被掛起,則不能進入阻塞。 */
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif
/* 使用循環邏輯,是為了解除阻塞後能檢查一下能否可以寫入。 */
for( ; ; )
{
taskENTER_CRITICAL(); /* 進入臨界,因為下面操作可能會涉及到全局資源,如那幾個任務鏈表 */
{
/* 隊列有空閑空間或需要強制寫入隊列,方可寫入 */
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
traceQUEUE_SEND( pxQueue );
{
const UBaseType_t uxPreviousMessagesWaiting = pxQueue->uxMessagesWaiting;
/* 拷貝數據到隊列里 */
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
if( pxQueue->pxQueueSetContainer != NULL ) /* 隊列集合 */
{
if( ( xCopyPosition == queueOVERWRITE ) && ( uxPreviousMessagesWaiting != ( UBaseType_t ) 0 ) )
{
/* 如果當前隊列裡面有數據,且本次寫入是覆蓋寫入,就不需要通知隊列集了,因為隊列集已經被通知過。 */
mtCOVERAGE_TEST_MARKER();
}
else if( prvNotifyQueueSetContainer( pxQueue ) != pdFALSE ) /* 通知隊列集當前隊列有數據了 */
{
/* 觸發任務切換。只是觸發,實際切換需要到退出臨界後才執行。 */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else /* 不是隊列集 */
{
/* 有任務阻塞在讀阻塞鏈表,現在隊列有數據了,需要解鎖一個任務 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
/* 從讀阻塞鏈表中解除一個最高優先順序且最先進入阻塞的任務。
即把這個解除阻塞的任務的事件節點從當前隊列的阻塞鏈表中抽離,把狀態節點從掛起鏈表或延時鏈表重新插入到就緒鏈表或掛起的就緒鏈表中。
如果解除阻塞的任務比當前在跑任務優先順序還高,就返回pdTRUE */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 觸發任務切換。只是觸發,實際切換需要到退出臨界後才執行。 */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else if( xYieldRequired != pdFALSE )
{
/* 如果是釋放互斥量時優先順序繼承機制觸發當前任務優先順序回落,就緒鏈表中有更高優先順序的任務,則觸發任務切換。只是觸發,實際切換需要到退出臨界後才執行。 */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL(); /* 退出臨界 */
return pdPASS; /* 返回成功 */
}
else /* 本次不能寫入,則檢查、準備進入阻塞處理 */
{
if( xTicksToWait == ( TickType_t ) 0 ) /* 不需要阻塞 */
{
taskEXIT_CRITICAL(); /* 退出臨界 */
traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL; /* 返回寫入失敗 */
}
else if( xEntryTimeSet == pdFALSE ) /* 需要阻塞。第一次循環,需要記錄當前時間,開始計時阻塞超時。 */
{
/* 備份當前系統節拍 */
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE; /* 標記已開始記錄了 */
}
else
{
/* 進入時間已經設定 */
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL(); /* 退出臨界 */
/* 退出臨界後系統會先處理在臨界期觸發的被屏蔽的中斷服務,如任務切換的中斷服務、其它中斷服務等等。 */
vTaskSuspendAll(); /* 又調度回到當前任務了,掛起調度器,繼續幹活 */
prvLockQueue( pxQueue ); /* 當前隊列上鎖,以免有中斷服務操作當前隊列,打亂老子的節奏 */
/* 檢查阻塞時間是否已經超時 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) /* 還沒超時呢,繼續等唄 */
{
if( prvIsQueueFull( pxQueue ) != pdFALSE ) /* 納尼,隊列還是滿的,寫不進去啊 */
{
traceBLOCKING_ON_QUEUE_SEND( pxQueue );
/* 我還是去這個隊列裡面的寫阻塞鏈表裡面排個隊吧。還可以插隊,不過只能插到比自己優先順序低的任務前面。 */
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );
/* 放開當前隊列的控制權 */
prvUnlockQueue( pxQueue );
/* 恢復調度器 */
if( xTaskResumeAll() == pdFALSE )
{
/* 如果在恢復調度器裡面沒有觸發過調度,那這裡需要觸發一次調度,因為當前任務已經處於阻塞態了,怎麼滴也要觸發一次調度切走。 */
portYIELD_WITHIN_API();
}
}
else /* 隊列有空位,趕緊寫 */
{
/* 解鎖當前隊列,進入下一個循環,看看能不能搶到寫入許可權 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll(); /* 恢復調度器 */
}
}
else /* 超時都沒等到寫入的許可權 */
{
/* 解鎖隊列 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll(); /* 恢復調度器 */
traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL; /* 寫入失敗 */
}
} /*lint -restore */
}
寫入隊列的API源碼prvCopyDataToQueue()
:(臨界中調用)
static BaseType_t prvCopyDataToQueue( Queue_t * const pxQueue,
const void * pvItemToQueue,
const BaseType_t xPosition )
{
BaseType_t xReturn = pdFALSE;
UBaseType_t uxMessagesWaiting;
/* 當前函數需要在臨界里被調用 */
uxMessagesWaiting = pxQueue->uxMessagesWaiting;
if( pxQueue->uxItemSize == ( UBaseType_t ) 0 ) /* 非隊類型 */
{
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX ) /* 互斥量 */
{
/* 互斥量類型調用該函數就是釋放互斥量的意思 */
/* 釋放互斥量,需要處理優先順序繼承機制,回落到基優先順序 */
xReturn = xTaskPriorityDisinherit( pxQueue->u.xSemaphore.xMutexHolder );
/* 標記互斥量已經被解鎖 */
pxQueue->u.xSemaphore.xMutexHolder = NULL;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
#endif /* configUSE_MUTEXES */
}
else if( xPosition == queueSEND_TO_BACK ) /* 往隊列尾部寫入 */
{
/* 按隊列屬性寫入 */
( void ) memcpy( ( void * ) pxQueue->pcWriteTo, pvItemToQueue, ( size_t ) pxQueue->uxItemSize );
/* 更新隊列寫指針 */
pxQueue->pcWriteTo += pxQueue->uxItemSize;
if( pxQueue->pcWriteTo >= pxQueue->u.xQueue.pcTail )
{
/* 如果本次寫入的數據時隊列最後一個成員,就需要把當前隊列寫指針重置回首個隊列成員。和ringbuffer原理類似 */
pxQueue->pcWriteTo = pxQueue->pcHead;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else /* 往有效隊列頭部寫入 */
{
/* 按列屬性寫入,讀指針就是有效隊列頭 */
( void ) memcpy( ( void * ) pxQueue->u.xQueue.pcReadFrom, pvItemToQueue, ( size_t ) pxQueue->uxItemSize );
/* 讀指針往前推 */
pxQueue->u.xQueue.pcReadFrom -= pxQueue->uxItemSize;
if( pxQueue->u.xQueue.pcReadFrom < pxQueue->pcHead )
{
/* 讀指針往前推時溢出後需要回溯到隊列最後一個成員 */
pxQueue->u.xQueue.pcReadFrom = ( pxQueue->u.xQueue.pcTail - pxQueue->uxItemSize );
}
else
{
mtCOVERAGE_TEST_MARKER();
}
if( xPosition == queueOVERWRITE )
{
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
/* 如果是覆蓋寫入,那當前隊列有效成員數量維持不變 */
--uxMessagesWaiting;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
/* 更新當前隊列有效成員數量 */
pxQueue->uxMessagesWaiting = uxMessagesWaiting + ( UBaseType_t ) 1;
return xReturn;
}
優先順序繼承機制概念、實現原理及其源碼在互斥量章節的筆記講解。
10.7.5 中斷專用的發送消息API
中斷專用的發送消息API比普通的發送消息API佛系了。
區別就是:中斷專用的沒有阻塞機制。
如果隊列有空閑空間,或本次是強制寫入,就把數據寫入。
-
寫入後如果隊列沒有上鎖,就更新當前隊列資訊,解鎖阻塞在讀阻塞隊列的最高優先順序、最早等待的一個任務。
-
如果隊列上鎖了,就用隊列中的
pxQueue->cTxLock
記錄當前隊列入隊了一個數據,在調用prvUnlockQueue()
解鎖時更新當前隊列資訊,解鎖阻塞在讀阻塞隊列的最高優先順序、最早等待的一個任務。
如果隊列沒有空閑空間,又不是強制寫入,就直接退出。
10.8 接收消息
當任務從隊列中讀取消息時,如果隊列中有消息,可以讀取並返回。
如果隊列中沒有消息,需要進入阻塞處理,在阻塞超時前,有其他任務或中斷服務往這個隊列裡面寫消息了,且當前任務時這個隊列中阻塞在讀阻塞鏈表中的最高優先順序、最先等待的任務,就解鎖該任務,否則還會一直阻塞到超時才喚醒當前任務。
10.8.1 接收消息API
/* 從隊列中讀取數據。 */
BaseType_t xQueueReceive( QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait );
/* 從隊列中讀取數據。中斷專屬 */
BaseType_t xQueueReceiveFromISR(QueueHandle_t xQueue, void *pvBuffer, BaseType_t *pxTaskWoken);
參數說明:
-
xQueue
:隊列句柄。 -
pvBuffer
:存儲接收數據的指針,其有效空間需要按照當前隊列屬性設定。 -
xTicksToWait
:如果隊列空則無法讀出數據,可以讓任務進入阻塞狀態,xTicksToWait表示阻塞的最大時間,單位:Tick Count。- 如果被設為0,無法讀出數據時函數會立刻返回;
- 如果被設為
portMAX_DELAY
,則會一直阻塞直到有數據可寫。
-
返回值:
pdPASS
:從隊列讀出數據入;errQUEUE_EMPTY
:讀取失敗,因為隊列空了。
10.8.2 接收消息簡要步驟
10.8.3 接收消息源碼
xQueueReceive()
:
BaseType_t xQueueReceive( QueueHandle_t xQueue,
void * const pvBuffer,
TickType_t xTicksToWait )
{
BaseType_t xEntryTimeSet = pdFALSE;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = xQueue;
/* 隊列句柄不能為空 */
configASSERT( ( pxQueue ) );
/* 如果數據區回傳地址為空,只能是不含數據區的IPC(訊號量、互斥量等) */
configASSERT( !( ( ( pvBuffer ) == NULL ) && ( ( pxQueue )->uxItemSize != ( UBaseType_t ) 0U ) ) );
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
/* 調度器掛起後,不能已阻塞式調用當前API */
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif
/* 使用循環邏輯,是為了解除阻塞後能檢查一下能否可以讀取。 */
for( ; ; )
{
taskENTER_CRITICAL(); /* 進入臨界 */
{
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;
/* 當前隊列有數據可讀 */
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
/* 出隊。 */
prvCopyDataFromQueue( pxQueue, pvBuffer );
traceQUEUE_RECEIVE( pxQueue );
/* 有效隊列成員個數更新 */
pxQueue->uxMessagesWaiting = uxMessagesWaiting - ( UBaseType_t ) 1;
/* 如果有任務阻塞在當前隊列的寫阻塞鏈表中,就解鎖一個,讓其寫入。 */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
/* 把這個解除阻塞的任務從當前隊列的寫阻塞鏈表中解除,並把該任務從延時鏈表或掛起鏈表中恢復到就緒鏈表或掛起的就緒鏈表中 */
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
/* 解鎖的任務比當前任務優先順序更加高,需要觸發任務調度。 */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
/* 退出臨界 */
taskEXIT_CRITICAL();
return pdPASS; /* 返回讀取成功 */
}
else /* 隊列為空呢 */
{
if( xTicksToWait == ( TickType_t ) 0 ) /* 不需要阻塞 */
{
/* 退出臨界並返回讀取失敗 */
taskEXIT_CRITICAL();
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else if( xEntryTimeSet == pdFALSE ) /* 進入阻塞,首次循環需要開始計時 */
{
/* 獲取當前系統節拍 */
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE; /* 標記已經開始計時 */
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL(); /* 退出臨界 */
/* 退出臨界後系統會先處理在臨界期觸發的被屏蔽的中斷服務,如任務切換的中斷服務、其它中斷服務等等。 */
vTaskSuspendAll(); /* 有回到了當前任務。掛起調度器 */
prvLockQueue( pxQueue ); /* 隊列上鎖 */
/* 檢查是否已經超時。 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) /* 還沒超時 */
{
if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) /* 如果隊列還沒有數據,需要繼續阻塞 */
{
traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue );
/* 我還是去這個隊列裡面的讀阻塞鏈表裡面排個隊吧。還可以插隊,不過只能插到比自己優先順序低的任務前面。 */
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
prvUnlockQueue( pxQueue ); /* 解鎖當前隊列 */
if( xTaskResumeAll() == pdFALSE ) /* 恢復調度器 */
{
/* 如果在恢復調度器時沒有調度過,這裡必須手動觸發一次調度。否則會在當前這個坑裡一直跑,直到有中斷服務往當前隊列里發消息,或者有更高優先順序的任務被解除阻塞,或者系統節拍中有同優先順序任務被解鎖(就緒鏈表中還有大於2個及其以上同優先順序的任務)(開啟時間片的前提下)才會跳出這個坑。 */
portYIELD_WITHIN_API();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
/* The queue contains data again. Loop back to try and read the
* data. */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else /* 已經超時了 */
{
/* 解鎖隊列 */
prvUnlockQueue( pxQueue );
/* 恢復調度器 */
( void ) xTaskResumeAll();
if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) /* 再次判斷下是否真的沒有數據,現在有數據還來得及 */
{
/* 真的沒有數據,返回讀取失敗吧。 */
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
} /*lint -restore */
}
出隊函數prvCopyDataFromQueue()
:
static void prvCopyDataFromQueue( Queue_t * const pxQueue,
void * const pvBuffer )
{
if( pxQueue->uxItemSize != ( UBaseType_t ) 0 ) /* 只有帶數據區的IPC才能調用 */
{
/* 偏移到下一個隊列成員 */
pxQueue->u.xQueue.pcReadFrom += pxQueue->uxItemSize;
if( pxQueue->u.xQueue.pcReadFrom >= pxQueue->u.xQueue.pcTail )
{
/* 讀指針溢出的話需要回溯 */
pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
/* 拷貝出數據 */
( void ) memcpy( ( void * ) pvBuffer, ( void * ) pxQueue->u.xQueue.pcReadFrom, ( size_t ) pxQueue->uxItemSize );
}
}
10.9 窺探消息
就是只讀取數據,不刪除該數據。
其源碼和xQueueReceive()
差不多,只是數據不刪除,讀指針也不偏移,有效個數也不減少。
BaseType_t xQueuePeek( QueueHandle_t xQueue,
void * const pvBuffer,
TickType_t xTicksToWait );
BaseType_t xQueueReceiveFromISR( QueueHandle_t xQueue,
void * const pvBuffer,
BaseType_t * const pxHigherPriorityTaskWoken );
10.10 隊列查詢
隊列查詢主要是操作隊列控制塊中的資訊。
10.10.1 查詢隊列當前有效數據個數
UBaseType_t uxQueueMessagesWaiting( const QueueHandle_t xQueue )
{
UBaseType_t uxReturn;
configASSERT( xQueue );
taskENTER_CRITICAL();
{
/* 獲取隊列有效成員個數 */
uxReturn = ( ( Queue_t * ) xQueue )->uxMessagesWaiting;
}
taskEXIT_CRITICAL();
return uxReturn;
}
10.10.2 查詢隊列當前可以空間個數
UBaseType_t uxQueueSpacesAvailable( const QueueHandle_t xQueue )
{
UBaseType_t uxReturn;
Queue_t * const pxQueue = xQueue;
configASSERT( pxQueue );
taskENTER_CRITICAL();
{
/* 總個數減去有效個數 */
uxReturn = pxQueue->uxLength - pxQueue->uxMessagesWaiting;
}
taskEXIT_CRITICAL();
return uxReturn;
}
10.11 刪除消息隊列
隊列刪除函數是根據消息隊列句柄直接刪除的,刪除之後這個消息隊列的所有資訊都會被系統回收清空,而且不能再次使用這個消息隊列了。
直接上源碼:
void vQueueDelete( QueueHandle_t xQueue )
{
Queue_t * const pxQueue = xQueue;
/* 隊列必須存在 */
configASSERT( pxQueue );
traceQUEUE_DELETE( pxQueue );
#if ( configQUEUE_REGISTRY_SIZE > 0 )
{
/* 如果開啟了隊列註冊表功能,也需要從隊列註冊表中取出當前隊列的記錄 */
vQueueUnregisterQueue( pxQueue );
}
#endif
#if ( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) )
{
/* 如果只開啟了動態記憶體功能,就是直接釋放當前隊列資源 */
vPortFree( pxQueue );
}
#elif ( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 1 ) )
{
/* 如果動態記憶體和靜態記憶體都開啟了,就需要區分當前隊列的記憶體資源來源 */
if( pxQueue->ucStaticallyAllocated == ( uint8_t ) pdFALSE ) /* 動態創建 */
{
/* 直接回收 */
vPortFree( pxQueue );
}
else /* 靜態記憶體,由用戶回收資源 */
{
mtCOVERAGE_TEST_MARKER();
}
}
#else /* if ( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) ) */
{
/* 靜態分配,只能由用戶回收。 */
( void ) pxQueue;
}
#endif /* configSUPPORT_DYNAMIC_ALLOCATION */
}
10.12 消息隊列使用注意
在使用freertos提供的消息隊列組件時,需要注意以下幾點:
- 使用xQueueSend()、xQueueSendFromISR()、xQueueReceive()等這些函數之前應先創建需消息隊列,並根據隊列句柄進行操作。
- 要明白寫入隊列採用的邏輯時FIFO還是LIFO,使用對應的API。
- 在獲取隊列中的消息時候,必須要定義一個存儲讀取數據的地方,並且該數據區域大小不小於消息大小,否則,很可能引發地址非法的錯誤。
- freertos的數據流是拷貝方式實現的,如果消息過大,建議使用拷貝引用。
- 隊列獨立在內核中,不屬於任何一個任務。
小結
學習,重在理解,懂得底層原理,上層特性、特點即可推理。