用RUST寫流媒體伺服器實戰——rtmp chunk 深入解析

目前做的測試還不夠多,倒是發現了一些問題。chunk這個東西看了很久可能很多人還是不明白,說明一下,RTMP 協議除了3次握手數據,其它的,包括信令和媒體數據(音影片相關的數據),都會被封裝成chunk塊。



chunk size

初始化的chunk size要設置成128。

我一開始的chunk size設置成了4096,用ffplay播放流,發送connect信令的時候,總是會多出一個byte,導致amf解析失敗,用wireshark抓包,這個byte是沒有的,一開始認為wireshark是不會出錯的,以為tokio網路庫,於是換成了tcp基礎庫,這個byte還是存在,想了個笨方法,找到一個開源的rtmp伺服器,也列印出此信令,剛收到tcp數據的時候,這個byte也有,但是amf解析卻成功了,接下來就是把每一步的數據都列印出來,從解析chunk到解析amf. 看看這個byte究竟是在哪個步驟消失的,最後發現,這個byte是chunk的第一個byte,fmt+csid,初始化的chunk size不對。。



  • basic header
  • message header
  • extended timestamp
  • payload


basic header

 * Chunk Basic Header
 * The Chunk Basic Header encodes the chunk stream ID and the chunk
 * type(represented by fmt field in the figure below). Chunk type
 * determines the format of the encoded message header. Chunk Basic
 * Header field may be 1, 2, or 3 bytes, depending on the chunk stream
 * ID.
 * The bits 0-5 (least significant) in the chunk basic header represent
 * the chunk stream ID.
 * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
 * field.
 *    0 1 2 3 4 5 6 7
 *   +-+-+-+-+-+-+-+-+
 *   |fmt|   cs id   |
 *   +-+-+-+-+-+-+-+-+
 *   Figure 6 Chunk basic header 1
 * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
 * field. ID is computed as (the second byte + 64).
 *   0                   1
 *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|    0      | cs id - 64    |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 7 Chunk basic header 2
 * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
 * this field. ID is computed as ((the third byte)*256 + the second byte
 * + 64).
 *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   |fmt|     1     |         cs id - 64            |
 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 *   Figure 8 Chunk basic header 3
 * cs id: 6 bits
 * fmt: 2 bits
 * cs id - 64: 8 or 16 bits
 * Chunk stream IDs with values 64-319 could be represented by both 2-
 * byte version and 3-byte version of this field.

第一個byte的前兩個bit是format,有0,1,2,3四個值,這個四個值的作用是壓縮message header,詳細的會在下面說,後6個bit是chunk stream ID, 簡稱csid(關於這個欄位有坑,下面會解釋),6個bit的取值範圍為[0,63] ,0和1有特殊用途,2到63表示真正的csid,關於特殊值0和1:

  • 0 表示csid用 6+ 8個bit表示
  • 1 表示csid用 6 + 16個bit表示


      let mut csid = (byte & 0b00111111) as u32;
      match csid {
       0 => {
           if self.reader.len() < 1 {
               return Ok(UnpackResult::NotEnoughBytes);
           csid = 64;
           csid += self.reader.read_u8()? as u32;
       1 => {
           if self.reader.len() < 1 {
               return Ok(UnpackResult::NotEnoughBytes);
           csid = 64;
           csid += self.reader.read_u8()? as u32;
           csid += self.reader.read_u8()? as u32 * 256;
       _ => {}

message header

下面說下message header, 這部分比較複雜,有四種類型,對應著basic header裡面的format欄位的0~3。

type 0

/* Type 0                                        */
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|                timestamp(3bytes)              |message length |
| message length (cont)(3bytes) |message type id| msg stream id |
|       message stream id (cont) (4bytes)       |


type 1

/* Type 1                                        */
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|                timestamp(3bytes)              |message length |
| message length (cont)(3bytes) |message type id|

省略了message stream id,使用上一個chunk的數據。

type 2

 /* Type 2                       */
  0                   1                   2
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
 |                timestamp(3bytes)              |

更絕了,省略了message stream id、message length和 message type id,這個也從前邊的chunk讀。

type 3

3 啥都沒有,全從前邊拿。

extended timestamp

這個欄位是可選的,佔用4個byte,如果message header裡面的timestamp欄位大於0xFFFFFF,則讀取這個欄位。


最後是payload,payload的長度由 message header裡面的message length決定。


  1. 讀取一個chunk的第一個byte,解析 format和chunk stream ID。
  2. 根據format解析message header:
    • 如果是0 則每個欄位都要從TCP流裡面解析出來。
    • 如果是1 則使用上一個chunk塊的message stream ID。
    • 如果是2 則使用上一個chunk塊的message stream id、message length和 message type id。
    • 如果是3 則使用上一個chunk塊的message stream id、message length、message type id以及timestamp。
  3. 根據timestamp值來決定是否讀取4個bytes的extendtimestamp。
  4. 根據message length讀取payload值,這裡有種情況比較特殊,有可能一塊payload數據被分成了2個或者多個chunk塊,在這一步裡面就需要將這些分割的payload 數據合成一個完整的chunk數據再返回。也就是說如果讀完payload數據後發現message length 不等於payload的長度,要回到步驟1從下一個chunk塊裡面繼續讀剩餘的payload數據,直到讀完為止。


    +--------+---------+-----+------------+------- ---+------------+
    |        | Chunk   |Chunk|Header Data |No.of Bytes|Total No.of |
    |        |Stream ID|Type |            | After     |Bytes in the|
    |        |         |     |            |Header     |Chunk       |
    |Chunk#1 | 	3      | 0   | delta: 1000| 32        | 44         |
    |        | 	       |     | length: 32,|           |            |
    |        |         |     | type: 8,   |           |            |
    |        |         |     | stream ID: |           |            |
    |        |         |     | 12345 (11  |           |            |
    |        |         |     | bytes)     |           |            |
    |Chunk#2 | 3       | 2   | 20 (3      | 32        | 36         |
    |        |         |     | bytes)     |           |            |
    |Chunk#3 | 4       | 3   | none (0    | 32        | 33         |
    |        |         |     | bytes)     |           |            |
    |Chunk#4 | 3       | 3   | none (0    | 32        | 33         |
    |        |         |     | bytes)     |           |            |

注意:message header裡面的欄位復用是針對chunk stream ID的。

因此上面的情況,chunk2 可以復用 chunk1的message header,但是chunk 4不能復用chunk 3的,所以,在程式碼裡面要特殊處理,每個csid的message header都需要保存一份,每解析一個chunk,讀完basic header之後,需要把這個csid的上一個message header先恢復出來。



也就是說,可能一個chunk還沒讀完,這次的tcp數據就用完了,需要等下一次的數據,這種情況就要保留讀取各個欄位的狀態了。每一個讀取操作就應該設置一個標記,因此寫了下面的四個大狀態,message header裡面有4個小的狀態。

#[derive(Copy, Clone)]
enum ChunkReadState {
    ReadBasicHeader = 1,
    ReadMessageHeader = 2,
    ReadExtendedTimestamp = 3,
    ReadMessagePayload = 4,
    Finish = 5,

#[derive(Copy, Clone)]
enum MessageHeaderReadState {x'x
    ReadTimeStamp = 1,
    ReadMsgLength = 2,
    ReadMsgTypeID = 3,
    ReadMsgStreamID = 4,

例如: ReadExtendedTimestamp佔用4個bytes,但是讀到這裡的時候就還剩下2個bytes,就要保留這個狀態,下次從TCP裡面讀出新數據的時候從這個狀態開始。

最後rtmp chunk解析的rust完整實現在這裡
