手繪模型圖帶你認識Kafka服務端網路模型

摘要:Kafka中的網路模型就是基於主從Reactor多執行緒進行設計的。

本文分享自華為雲社區《圖解Kafka服務端網路模型》,作者:石臻臻的雜貨鋪 。

Kafka中的網路模型就是基於主從Reactor多執行緒進行設計的, 在整體講述Kafka網路模型之前,我們現在按照源碼中的相關類來講解一下他們分別都是用來做什麼的。

關鍵類解析

SocketServer

這個類是網路通訊的核心類,它持有這Acceptor和 Processor對象。

ConnectionQuotas

這個是控制連接數配額的類,

涉及到的Broker配置有:

AbstractServerThread

AbstractServerThread 類:這是Acceptor執行緒和Processor執行緒的抽象基類,它定義了一個抽象方法wakeup() ,主要是用來喚醒Acceptor 執行緒和 Processor 對應的Selector的, 當然還有一些共用方法

Acceptor 和 Processor

Acceptor 執行緒類:繼承自AbstractServerThread, 這是接收和創建外部 TCP 連接的執行緒。每個 SocketServer 實例一般會創建一個 Acceptor 執行緒(如果listeners配置了多個就會創建多個Acceptor)。它的唯一目的就是創建連接,並將接收到的 SocketChannel(SocketChannel通道用於傳輸數據) 傳遞給下游的 Processor 執行緒處理,Processor主要是處理連接之後的事情,例如讀寫I/O。

涉及到的Broker配置有:

Processor 執行緒類:這是處理單個TCP 連接上所有請求的處理執行緒。每個Acceptor 實例創建若干個(num.network.threads)Processor 執行緒。Processor 執行緒負責將接收到的 SocketChannel(SocketChannel通道用於傳輸數據。), 註冊讀寫事件,當數據傳送過來的時候,會立即讀取Request數據,通過解析之後, 然後將其添加到 RequestChannel 的 requestQueue 隊列上,同時還負責將 Response 返還給 Request 發送方。

涉及到的Broker配置有:

簡單畫了一張兩個類之間的關係圖

    1. 這兩個類都是 AbstractServerThead的實現類,超類是Runnable 可運行的。
    2. 每個Acceptor持有num.network.threads個 Processor 執行緒, 假如配置了多個listeners,那麼總共Processor執行緒數是 listeners*num.network.threads.
    3. Acceptor 創建的是ServerSocketChannel通道,這個通道是用來監聽新進來的TCP鏈接的通道,
      通過serverSocketChannel.accept()方法可以拿到SocketChannel通道用於傳輸數據。
    4. 每個Processor 執行緒都有一個唯一的id,並且通過Acceptor拿到的SocketChannel會被暫時放入到newConnections隊列中
    5. 每個Processor 都創建了自己的Selector
    6. Processor會不斷的從自身的newConnections隊列裡面獲取新SocketChannel,並註冊讀寫事件,如果有數據傳輸過來,則會讀取數據,並解析成Request請求。

既然兩個都是可執行執行緒,那我們看看兩個執行緒的run方法都做了哪些事情

Acceptor.run

def run(): Unit = {
    //將serverChannel 註冊到nioSelector上,並且對 Accept事件感興趣:表示伺服器監聽到了客戶連接,那麼伺服器可以接收這個連接了
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    try {
      var currentProcessorIndex = 0
      while (isRunning) {
        try {
          //返回感興趣的事件數量  這裡是感興趣的是SelectionKey.OP_ACCEPT,監聽到新的鏈接
          val ready = nioSelector.select(500)
          if (ready > 0) {
            //獲取所有就緒通道
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            //遍歷所有就緒通道
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                //只處理   Accept事件,其他的事件則拋出異常,ServerSocketChannel是 監聽Tcp的鏈接通道
                if (key.isAcceptable) {
                  //根據Key 拿到SocketChannle = serverSocketChannel.accept(),然後再遍歷
                  accept(key).foreach { socketChannel =>
 
                    //將socketChannel分配給我們的 processor來處理,如果有多個socketChannel 則按照輪訓分配的原則
                    //如果一個processor 中能夠處理的newconnection 隊列滿了放不下了,則找下一個
                    // 如果所有的都放不下,則會一直循環直到有processor能夠處理。

                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      //輪訓每個processors來處理
                      processor = synchronized {
                        // adjust the index (if necessary) and retrieve the processor atomically for
                        // correct behaviour in case the number of processors is reduced dynamically
                        currentProcessorIndex = currentProcessorIndex % processors.length
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          省略
        }
      }
    } finally {
     省略
    }
  }

1、將ServerSocketChannel通道註冊到nioSelector 上,並關注事件SelectionKey.OP_ACCEPT

serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

2、while循環,持續阻塞監聽事件,超時時間500ms

// 阻塞查詢Selector是否有監聽到新的事件
    val ready = nioSelector.select(500)
    // 如果有事件,則查詢具體的事件和通道
    if(ready>0>{
         //獲取所有就緒事件準備處理
        val keys = nioSelector.selectedKeys()
    }

3、遍歷剛剛監聽到的事件, 如果該SelectionKey不包含OP_ACCEPT(建立連接)事件,則拋出異常,通常不會出現這個異常。

Unrecognized key state for acceptor thread

4、如果SelectionKey包含OP_ACCEPT(建立連接)事件,則可以通過這個SelectionKey拿到serverSocketChannel,通過serverSocketChannel 拿到socketChannel,並且將SocketChannel設置為非阻塞模式。

  val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
 // 調用accept方法就可以拿到ScoketChannel了。
  val socketChannel = serverSocketChannel.accept()
   //設置為非阻塞模式 就可以在非同步模式下調用connect(), read() 和write()了。
  socketChannel.configureBlocking(false)

5、接下來,把上面拿到的SocketChannel以遍歷的形式給Acceptor下面的Procesor, 讓Processor來執行後面的處理。分配的體現形式是, 將拿到的SocketChannel保存在Processor中的newConnections阻塞隊列中,這個newConnections上限是20,在程式碼裡面寫死了的,也就是說一個Processor同時最多只能處理20個連接, 那麼所有的Processor能處理的最大連接就是Processor數量 * 20;如果你的連接請求並發度很高,可以嘗試調大num.network.threads

6、最後,如果newConnections隊列放入了一個新的SocketChannel,則會調用一下對應Processor實例的wakeup()方法。

Procesor.run

override def run(): Unit = {
    startupComplete()
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          // 將之前監聽到的TCP鏈接(暫時保存在newConnections中) 開始註冊監聽OP_READ事件到每個Processor的 KSelector選擇器中。
          configureNewConnections()
          // register any new responses for writing
          processNewResponses()
          //在不阻塞的情況下對每個連接執行任何 I/O 操作。這包括完成連接、完成斷開連接、啟動新發送或在進行中的發送或接收上取得進展。
          // 當此調用完成時,用戶可以使用completedSends() 、 completedReceives() 、 connected() 、 disconnected()檢查已完成的發送、接收、連接或斷開連接。
          poll()
          // 把請求解析後放到 requestChannels 隊列中,非同步處理
          processCompletedReceives()
          //處理已經發送完成的請求
          processCompletedSends()
          processDisconnected()
          closeExcessConnections()
        } catch {
          // We catch all the throwables here to prevent the processor thread from exiting. We do this because
          // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
          // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
          // be either associated with a specific socket channel or a bad request. These exceptions are caught and
          // processed by the individual methods above which close the failing channel and continue processing other
          // channels. So this catch block should only ever see ControlThrowables.
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
  }
  1. configureNewConnections(): 之前Acceptor監聽到的SocketChannel保存在Procesor中的newConnections阻塞隊列中, 現在開始將newConnections阻塞隊列一個個取出來,向Procesor的Selector註冊SocketChannel通道,並且感興趣的事件為SelectionKey.OP_READ讀事件。
  2. processNewResponses() : 去Processor裡面的無邊界阻塞隊列responseQueue裡面獲取RequestChannel.Response數據, 如果有數據並且需要返回Response的話, 則通過channel返回數據. 具體的Channel是根據connectionId 獲取之前構建的KafkaChannel, KafkaChannel則會通過監聽SelectionKey.OP_WRITE。然後調用writeTo方法。
    至於responseQueue這個隊列是什麼時候入隊的,我們後面再分析
  3. poll(): 這個方法裡面執行的就很多了, 這個方法底層調用的是selector.poll(); 將監聽到的事件批量處理,它才是執行I/O請求的最終地方, 它正對每個連接執行任何的I/O操作,這包括了 完成連接、完成斷開連接、啟動新發送等等。
    像校驗身份資訊,還有handshake等等這些也都是在這裡執行的。
  4. processCompletedReceives(): 處理所有completedReceives(已完成接收的請求)進行接下來的處理, 處理的方式是解析一下收到的請求,最終調用了requestChannel.sendRequest(req). 也就是說所有的請求最終通過解析放入到了RequestChannel中的requestQueue阻塞隊列中, 這個阻塞隊列的大小為queued.max.requests默認500;表示的是在阻塞網路執行緒之前,數據平面允許的排隊請求數
    PS: 這個completedReceives 是在 poll()方法中添加的元素。
  5. processCompletedSends(): 它負責處理 Response 的回調邏輯,通過遍歷completedSends(已完成發送)集合 可以從inflightResponses中移除並拿到response對象,然後再調用回調邏輯。
    PS: 這個completedSends 是在 poll()方法中添加的元素。
  6. processDisconnected(): 處理斷開鏈接的情況, connectionQuotas連接限流減掉這個鏈接,inflightResponses也移除對應連接。
  7. closeExcessConnections(): 關閉超限連接 ,當總連接數 >max.connections && (inter.broker.listener.name!=listener|| listeners 數量==1) 則需要關閉一些連接.
    簡單來說就是:就算Broker已經達到了最大連接數的限制了, 也應該允許 broker之間監聽器上的連接, 這種情況下,將會關閉另外一個監聽器上最近最少使用的連接。broker之間的監聽器是配置inter.broker.listener.name 決定的
    所謂優先關閉,是指在諸多 TCP 連接中找出最近未被使用的那個。這裡「未被使用」就是說,在最近一段時間內,沒有任何 Request 經由這個連接被發送到 Processor 執行緒。

RequestChannel

這個類保存這所有的Processor,還有一個阻塞隊列保存這待處理請求。這個隊列最大長度由queued.max.requests控制,當待處理請求超過這個數值的時候網路就會阻塞

在這裡插入圖片描述

涉及到的Broker配置有:

KafkaApis

具體Request的處理類, 所有的請求方法處理邏輯都放在這個裡面。

KafkaRequestHandlerPool

KafkaRequestHandler的執行緒池,KafkaRequestHandler執行緒的數量由配置num.io.threads決定。

在這裡插入圖片描述

涉及到的Broker配置有:

KafkaRequestHandler

請求處理類, 每個Handler都會去 requestChannel的requestQueue隊列裡面poll請求, 然後去處理,最終調用的處理方法是KafkaApis.handle()

這幾個類之間的關係如下

在這裡插入圖片描述

通訊流程總結

在這裡插入圖片描述

  1. KafkaServer啟動的時候,會根據listeners的配置來初始化對應的實例。
  2. 一個listeners對應一個Acceptor,一個Acceptor持有若干個(num.network.threads)Processor實例。
  3. Acceptor 中的nioSelector註冊的是ServerSocketChannel通道,並監聽OP_ACCEPT事件,它只負責 TCP 創建和連接,不包含讀寫數據。
  4. 當Acceptor監聽到新的連接之後,就會通過調用socketChannel = serverSocketChannel.accept()拿到SocketChannel,然後把SocketChannel保存在Processor裡面的newConnection隊列中。 那麼具體保存在哪個Processor中呢?當然是輪詢分配了,確保負載均衡嘛。當然每個Processor的newConnection隊列最大只有20,並且是程式碼寫死的。如果一個Processor滿了,則會尋找下一個存放,如果所有的都滿了,那麼就會阻塞。一個Acceptor的所有Processor最大能夠並發處理的請求是 20 * num.network.threads。
  5. Processor會持續的從自己的newConnection中poll數據,拿到SocketChannel之後,就把它註冊到自己的Selector中,並且監聽事件 OP_READ。 如果newConnection是空的話,poll的超時時間是 300ms。
  6. 監聽到有新的事件,比較READ,則會讀取數據,並且解析成Request, 把Request放入到 RequestChannel中的requestQueue阻塞隊列中。所有的待處理請求都是臨時放在這裡面。這個隊列也有最大值queued.max.requests(默認500),超過這個大小就會阻塞。
  7. KafkaRequestHandlerPool中創建了很多(num.io.threads(默認8))的KafkaRequestHandler,用來處理Request, 他們都會一直從RequestChannel中的requestQueue隊列中poll新的Request,來進行處理。
  8. 處理Request的具體邏輯是KafkaApis裡面。當Request處理完畢之後,會調用requestChannel.sendResponse()返回Response。
  9. 當然,請求Request和返回Response必須是一一對應的, 你這個請求是哪個Processor監聽到的,則需要哪個Processor返回, 他們通過id來標識。
  10. Response也不是裡面返回的,而是先放到Processor中的ResponseQueue隊列中,然後慢慢返回給客戶端。

數據面板(DataPlane)

數據面板是用來處理 Broker與Broker/Client之間的網路模型模組, 與之相對的是控制器面板。

控制器面板 是專門用於Controller與Broker之間的網路通訊模組。

其實本質上他們都是一模一樣的, 但是為了將Controller的通訊和普通通訊隔離,才有這麼兩個概念。

上面的網路通訊模型就是以數據面板來分析的,因為本質是一樣的, 只是有一些配置不一樣。

那麼,數據面板就不詳細講了,我們主要講下控制器面板的不一樣的地方。

控制器面板(ControllerPlane)

控制器面板是用來專門處理 Controller相關請求的獨立通訊模組。

大家都知道,Controller是一個很重要的角色,基本上大部分協調整個集群的相關請求都跟它有關係, 比如創建Topic、刪除Topic、分區副本重分配、等等。他們都很重要。

但是一般情況下數據面板的請求很多,如果因為請求過多而導致Controller相關請求被阻塞不能執行,那麼可能會造成一些影響, 所以我們可以讓Controller類的請求有一個單獨的通訊模組。

首先,要啟用控制器面板,必須配置control.plane.listener.name. 並且這個監聽器名稱必須在listeners裡面有配置

否則的話,是不會專用的控制器鏈接的EndPoint的。

例如:

Broker配置

## 所有的監聽器
isteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

## 監聽器對應的安全協議
listener.security.protocol.map = INTERNAL: PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL

## 控制器
control.plane.listener.name = CONTROLLER

在啟動時,代理將開始使用安全協議「SSL」監聽「192.1.1.8:9094」。

在控制器端,當它通過 zookeeper 發現代理髮布的端點時,它將使用 control.plane.listener.name 找到端點,它將用於建立與代理的連接。

  1. 必須配置control.plane.listener.name 才能使用獨立的控制器面板
  2. 控制器面板的RequestChannel中的requestQueue不是由queued.max.requests控制的,而是寫死的 20. 因為控制類請求不會有那麼大的並發
  3. 跟DataPlane相關隔離,互不影響。但是連接限流ConnectionQuotas是共享的,限流的時候,兩個是算在一起的
  4. 控制類面板只有一個Acceptor和一個Processor,這個跟數據面板的區別是 DataPlane的Processor可以有多個。

涉及到的Broker配置有:

上面我們主要分析了一下, Kafka中的網路通訊模型, 那麼聰明的你應該肯定能夠看的出來,它是使用執行緒模型中的 Reactor模式來實現的。

執行緒模型: Reactor模式

該模組詳細請參考Reactor 模型

Reactor 模式,是指通過一個或多個輸入同時傳遞給服務處理器的服務請求的事件驅動處理模式。

服務端程式處理傳入多路請求,並將它們同步分派給請求對應的處理執行緒,Reactor 模式也叫 Dispatcher 模式。
即 I/O 多路復用統一監聽事件,收到事件後分發(Dispatch 給某進程),是編寫高性能網路伺服器的必備技術之一。

根據 Reactor 的數量和處理資源池執行緒的數量不同,有 3 種典型的實現:

  1. 單 Reactor 單執行緒;
  2. 單 Reactor 多執行緒;
  3. 主從 Reactor 多執行緒。

我們主要了解一下 主從Reactor 多執行緒

該圖來源於網路

針對單 Reactor 多執行緒模型中,Reactor 在單執行緒中運行,高並發場景下容易成為性能瓶頸,可以讓 Reactor 在多執行緒中運行。

方案說明:

  • Reactor 主執行緒 MainReactor 對象通過 Select 監控建立連接事件,收到事件後通過 Acceptor 接收,處理建立連接事件;
  • Acceptor 處理建立連接事件後,MainReactor 將連接分配 Reactor 子執行緒給 SubReactor 進行處理;
  • SubReactor 將連接加入連接隊列進行監聽,並創建一個 Handler 用於處理各種連接事件;
  • 當有新的事件發生時,SubReactor 會調用連接對應的 Handler 進行響應;
  • Handler 通過 Read 讀取數據後,會分發給後面的 Worker 執行緒池進行業務處理;
  • Worker 執行緒池會分配獨立的執行緒完成真正的業務處理,如何將響應結果發給 Handler 進行處理;
  • Handler 收到響應結果後通過 Send 將響應結果返回給 Client。

更詳細的介紹可以看 Reactor 模型

問答

(1)Kafka的網路模型使用了Reactor模式的哪種實現方式?

  1. 單 Reactor 單執行緒;
  2. 單 Reactor 多執行緒;
  3. 主從 Reactor 多執行緒。

答案: 3 。 使用了主從Reactor多執行緒的實現方式.

在這裡插入圖片描述

MainReactor(Acceptor)只負責監聽OP_ACCEPT事件, 監聽到之後把SocketChannel 傳遞給 SubReactor(Processor), 每個Processor都有自己的Selector。SubReactor會監聽並處理其他的事件,並最終把具體的請求傳遞給KafkaRequestHandlerPool。

很典型的主從Reactor多執行緒模式。

(2)什麼是ControllerPlane(控制器面板),什麼是DataPlane(數據面板)?

控制器面板: 主要處理控制器類的的請求
數據面板: 主要處理數據類的請求。

讓他們隔離,互不影響,比如說普通的請求太多,導致了阻塞, 那麼Controller相關的請求也可能被阻塞了,所以讓他們隔離,不會互相影響。

但是默認情況下, ControllerPlane是沒有設置的,也就是Controller相關的請求還是走的DataPlane。 想要隔離的話必須設置control.plane.listener.name .

  1. 必須配置control.plane.listener.name
  2. 控制器面板的RequestChannel中的requestQueue不是由queued.max.requests控制的,而是寫死的 20. 因為控制類請求不會有那麼大的並發
  3. 跟DataPlane相關隔離,互不影響。但是連接限流ConnectionQuotas是共享的,限流的時候,兩個是算在一起的
  4. 控制類面板只有一個Acceptor和一個Processor,這個跟數據面板的區別是 DataPlane的Processor可以有多個。

 

點擊關注,第一時間了解華為雲新鮮技術~