Apache Kafka分散式流處理平台及大廠面試寶典v3.0.0
概述
**本人部落格網站 **IT小神 www.itxiaoshen.com
定義
Apache Kafka官網地址 //kafka.apache.org/ 最新版本為 3.0.0
Apache Kafka是一個開源的分散式事件流平台,使用Scala和Java混合編寫,Kafka最初由Linkedin公司開發,2011年貢獻給了Apache基金會並成為頂級開源項目。消息隊列就是用於數據生產方和消費方解耦合的中間件。顧名思義,主體就是一個隊列的形式收集消息,數據在消費端按照FIFO的原則被消費。
Apache Kafka主要以Java 8、11和17源碼構建及測試,但Java 8支援從Apache Kafka 3.0開始就已棄用,並將在Apache Kafka 4.0中被移除。
近幾天連續學習兩個Apache的開源項目,今天我們又來學習另外一個Apache頂級開源項目Kafka,可以見得Apache在開源世界的絕對大佬地位。Kafka是一個基於Zookeeper協調的支援分區(partition)、多副本(replica)的分散式消息系統,最大特性是可以實時處理大量數據以滿足各種需求場景,常用於大數據場景消息流中間件;其他消息隊列有ActiveMQ、RabbitMQ、ZeroMQ、MetaMQ、RocketMQ,目前比較主流消息中間件是Kafka、RocketMQ和RabbitMQ。
核心能力
- 高吞吐量
- Kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,即使存儲TB量級消息也能有穩定的性能。
- 可擴展
- Kafka集群支援動態擴展,可彈性伸縮將生產集群規模擴大到上千個代理和數十萬個分區。
- 持久化
- 將數據流安全地存儲在分散式、持久、容錯的集群中。
- 高可用
- 分散式、分區、複製和容錯,支援數據備份,在可用性區域上有效地擴展集群或者跨地理區域連接獨立的集群。
應用場景
Kafka作為一個消息中間件最基本的用途為解耦、非同步、削峰、消息通訊,下面為其常用的場景:
- 日誌收集:可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer。
- 消息系統:分散式發布、訂閱消息系統,消息隊列將消息生產者和訂閱者分離,實現應用解耦和快取消息等。
- 流量削峰:在應用前端以消息隊列接收請求,當請求超過隊列長度,直接不處理重定向至一個靜態頁面,來達到削峰的目的,比如用於秒殺活動,因為秒殺活動高度集中用戶訪問導致流量暴增,可能導致應用系統雪崩。
- 流式處理:結合主流的流式處理框架如Flink、Spark Streaming、Storm。
- 用戶活動跟蹤:kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動資訊被各個伺服器發布到kafka的topic中,然後消費者通過訂閱這些topic來做實時的監控分析,亦可保存到資料庫。
- 運營指標:kafka也經常用來記錄運營監控數據。包括收集各種分散式應用的數據,生產各種操作的集中回饋,比如報警和報告。
安裝
我們這裡規劃部署192.168.50.34、192.168.50.35、192.168.50.36共3個節點的Kafka集群,當然是需要有基本JDK環境,Kafka部署還需依賴ZooKeeper,剛好上一篇文章我們也非常愉快的學習和部署Zookeeper集群,直接拿來使用,奧利給!
#官網下載
wget --no-check-certificate //dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
#解壓
tar -zxvf kafka_2.13-3.0.0.tgz
#進入kafka目錄下創建logs文件夾
cd kafka_2.13-3.0.0
mkdir logs
#並修改config目錄的server.properties
vim config/server.properties
修改server.properties配置文件內容如下:
// 作為當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
broker.id=1
// 監聽地址和埠號,默認是9092
listeners=PLAINTEXT://192.168.50.34:9092
// 消息存放的目錄
log.dirs=/home/commons/kafka_2.13-3.0.0/logs
// zookeeper集群地址
zookeeper.connect=192.168.50.34:2181,192.168.50.35:2181,192.168.50.36:2181
#然後將上整個kafka_2.13-3.0.0拷貝到另外兩台192.168.50.35、192.168.50.36上的相同目錄(我們這裡是指/home/commons),配置文件主要修改broker.id和listeners,broker.id唯一那我們就順序編號為2和3,listeners中的host就分配與主機地址對應即可,修改完成後我們可以在kafka根目錄下分別啟動三台kafka server,後台常駐方式帶上參數 -daemon
./bin/kafka-server-start.sh -daemon config/server.properties
#啟動後可以通過ps或者jps檢查進程資訊和logs下的日誌文件如server.log
ps -ef | grep kafka
#停止kafka server
./bin/kafka-server-stop.sh
集群簡單測試
- 創建、列出所有topic、查看指定的topic
#3.0版本官方使用-–bootstrap-server替代–-zookeeper,創建名稱為mytopic1的Topic,指定分區數為1,分區的副本數為1
bin/kafka-topics.sh --create --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --replication-factor 1 --partitions 1 --topic mytopic1
#創建名稱為mytopic2的Topic,指定分區數為2,分區的副本數為2
bin/kafka-topics.sh --create --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --replication-factor 2 --partitions 2 --topic mytopic2
#列出所有的topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092
#查看指定topic
$bin/kafka-topics.sh --describe --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1
$bin/kafka-topics.sh --describe --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic2
- 增加Topic的partition數量和查看指定查看 topic 指定分區 offset 的最大值或最小值和刪除topic
#將mytopic1的分區數量擴充到5個
bin/kafka-topics.sh --alter --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --partitions 5 --topic mytopic1
#time 為 -1 時表示最大值,為 -2 時表示最小值:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic1 --time -1 --broker-list 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --partitions 0
- 刪除topic
#刪除mytopic2
kafka-topics.sh --delete --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic2
- 生產和消費消息
#往mytopic1里生產消息,這裡可以使用--broker-list也可以使用--bootstrap-server
./bin/kafka-console-producer.sh --broker-list 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1
#從頭開始消費消息
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --from-beginning
#從尾部開始消費消息,從尾部開始取數據,必需要指定分區:如果需要取指定個數消息可以加上如--max-messages 5的參數
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --offset latest --partition 1
發送多條消息,最終在該分區下收到分發的該分區下的消息
- 指定消費者組
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 -group mygroup1 --from-beginning
- 查看消費者組列表
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --list
- 查看消費者組詳情和其他
#消費者組詳情./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --group mygroup1 --describe#刪除消費者組./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --group mygroup1 --delete#平衡Leader,--partition:指定需要重新分配leader的partition編號./bin/kafka-leader-election.sh --bootstrap-server 192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092 --topic mytopic1 --partition=2 --election-type preferred#此外還有kafka自帶壓測工具./bin/kafka-producer-perf-test.sh --topic mytopic1 --num-records 1000 --record-size 1 --throughput 1000 --producer-props bootstrap.servers=192.168.50.34:9092,192.168.50.35:9092,192.168.50.36:9092
架構原理面試寶典
Kafka的架構和組成
一個典型的 Kafka 體系架構包括若干 Producer、若干 Broker、若干 Consumer,以及一個 ZooKeeper 集群,如下圖所示。其中 ZooKeeper 是 Kafka 用來負責集群元數據的管理、控制器的選舉等操作的。Producer 將消息發送到 Broker,Broker 負責將收到的消息存儲到磁碟中,而 Consumer 負責從 Broker 訂閱並消費消息。
- Broker:服務代理節點,Kafka 集群包含一個或多個伺服器,這種伺服器被稱為 broker。對於 Kafka 而言,Broker 可以簡單地看作一個獨立的 Kafka 服務節點或 Kafka 服務實例。大多數情況下也可以將 Broker 看作一台 Kafka 伺服器,前提是這台伺服器上只部署了一個 Kafka 實例。一個或多個 Broker 組成了一個 Kafka 集群。
- Topic:每條發布到 Kafka 集群的消息都有一個類別,這個類別被稱為 Topic, Topic屬於邏輯概念,也即是物理上不一樣 Topic 的消息分開存儲但邏輯上一個 Topic 的消息雖然保存於一個或多個 broker 上,但用戶只需指定消息的 Topic 便可生產或消費數據而沒必要關心數據存於何處。
- Partition:Partition 屬於物理概念,Topic的分區,每一個 Topic 包含一個或多個 Partition,分區的作用是做負載,提高kafka的吞吐量,同一個topic在不同的分區的數據是不重複的,partition的表現形式就是一個一個的文件夾。分區主要實現擴展和提高並發,以partition作為讀寫單位,可以同時多個生產者消費者讀寫。
- Replication:每一個分區可以有一個主分區(Leader)和多個副本(Follower),副本的作用是做備份。當主分區故障的時候會選擇一個副本成為新的Leader。在kafka中副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也只可能存放一個副本(包括自己)。
- Producer: 生產者,也就是發送消息的一方。生產者負責創建消息,然後將其投遞到 Kafka 中,負責發布消息到 Kafka broker。
- Consumer:消費者,也就是接收消息的一方,向 Kafka broker 讀取消息的客戶端,消費者連接到 Kafka 上並接收消息,進而進行相應的業務邏輯處理。
- Consumer Group:每一個 Consumer 屬於一個特定的 Consumer Group(可為每一個 Consumer 指定 group name,若不指定 group name 則屬於默認的 group),我們可以將多個消費者組成一個消費者組,在kafka的設計中同一個分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的數據,這也是為了提高kafka的吞吐量。
- Message:每一條發送的消息主體,也即是消息內容。
- Zookeeper:kafka集群依賴zookeeper來保存集群的的元資訊,來保證系統的可用性,管理broker上下線、Topic的分區和副本分配、leader選舉、consumer動態擴縮容和觸發負載均衡,維護消費者關係和每個partition分區的消費資訊;broker啟動將ip、埠註冊存儲Zookeeper,存儲topic和partition。
Kafka的四個核心API
- Producer API:生產者API允許應用程式將一組記錄發布到一個或多個Kafka Topic中。
- Consumer AIP:消費者API允許應用程式訂閱一個或多個Topic,並處理向他們傳輸的記錄流。
- Streams API:流API允許應用程式充當串流處理器,從一個或者多個Topic中消費輸入流,並將輸出流生成為一個或多個輸出主題,從而將輸入流有效地轉換為輸出流。
- Connector API:連接器API允許構建和運行可重用的生產者或消費者,這些生產者或消費者將Kafka Topic連接到現有的應用程式或數據系統。例如:連接到關係資料庫的連接器可以捕獲對錶的修改資訊。
消息隊列的模式?
-
點對點模式
- 消息生產者發送消息到消息隊列中,然後消息消費者從隊列中取出並且消費消息,消息被消費後,隊列中不在存儲。所以消息消費者不可能消費到已經被消費的消息;隊列支援存在多個消費者,但是對於一個消息而言,只會有一個消費者可以消費;如果想發給多個消費者,則需要多次發送該條消息。
-
發布/訂閱模式(一對多,消費者消費數據之後不會清除消息)
- 消息生產者將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息,和點對點的方式不同,發布到topic的消息會被所有的訂閱者消費;但是消息隊列不是存儲系統數據保留是期限的,例如Kafka默認是7天;kafka就是這種模式的,分為兩種方式
- 一種是是消費者去主動去消費(拉取)消息,而不是生產者推送消息給消費者。
- 另外一種就是生產者主動推送消息給消費者,類似公眾號。
- 消息生產者將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息,和點對點的方式不同,發布到topic的消息會被所有的訂閱者消費;但是消息隊列不是存儲系統數據保留是期限的,例如Kafka默認是7天;kafka就是這種模式的,分為兩種方式
Kafka的存儲模型
- Kafka中消息是以Topic進行分類的,生產者生產消息,消費者消費消息,都是面向Topic的。而Topic在物理上的存儲是分區存儲的,即按Partition分散式存儲。
- 每個Partition中的數據又是順序寫入log文件中進行存儲。這樣會出現分區log文件過大,導致的讀取性能下降的問題。所以Kafka將log文件切分成了segment,每個segment由 .log數據存儲文件 和 .index索引文件 和 .timeindex文件組成。其中.log用於存儲消息本身的數據內容,.index存儲消息在文件中的位置(包括消息的邏輯offset和物理存儲offset),.timeindex*存儲消息創建時間和對應邏輯地址的映射關係。詳細的結構如下圖所示:
- 每個log文件和index文件的命名就是 文件中起始數據的偏移量,一個segment中由index定位到對應log文件中執行數據的原理如下圖:
- Kafka將分區拆分成多個段是為了控制存儲的文件大小,如果整個分區只保存為一個文件,那隨著分區里消息的增多,文件也將越來越大,最後不可控制。而如果每個消息都保存為一個文件,那文件數量又將變得巨大,同樣容易失去控制。所以kafka採用段設計方式,控制了每個文件的大小和文件的數量。同時可以很方便地通過作業系統mmap機制映射到記憶體中,提高寫入和讀取效率。還有另一個好處是當系統要清除過期數據時可以直接將過期的段文件刪除即可。但是這裡也會有一個問題,如果每個消息都要在index文件中保存位置資訊,那麼index文件也很容易變得很大,這樣又會減弱上文所說的好處。所以在kafka中,index設計為稀疏索引來降低index的文件大小,這樣,index文件存儲的實際內容為:該段消息在消息隊列中的相對offset和在log文件中的物理偏移量映射的稀疏記錄。那麼多少條消息會在index中保存一條記錄呢?這個可以通過系統配置來進行設置。索引記錄固定為8個位元組大小,分別為4個位元組的相對offset(消息在partition中全局offset減去該segment的起始offset),4個位元組的消息具體存儲文件的物理偏移量。
- index文件中根據需要查找的offset根據保存起始偏移量(文件名)的相對偏移量,定位到log中數據真實的位置。(類似HBase中採用LSM日誌結構合併樹設計思想,順序寫入HFile,磁頭定址次數少,順序讀/寫性能好),Kafka不會在消費者拉取完消息後馬上就清理消息,而是會保存段文件一段時間,直到其過期再標記為可清理,由後台程式定期進行清理。這種機制使得消費者可以重複消費消息,滿足更靈活的需求。
- 查找機制:建立在offset為有序的基礎上,利用segment+有序offset+稀疏索引+二分查找+順序查找等多種手段來高效的查找數據。
- 存儲策略:無論消息是否被消費,kafka都會保存所有的消息。那對於舊數據有什麼刪除策略呢?
- 基於時間,默認配置是168小時(7天)。
- 基於大小,默認配置是1073741824。
- 需要注意的是kafka讀取特定消息的時間複雜度是O(1),所以刪除過期的文件並不會提高kafka的性能。
說說Kafka消費者端使用的技術點?
-
消費模式
- Kafka的消費模式是poll模式,就是每個消費者按照自己的消費能力在Broker中讀取數據。從 Broker 主動拉取數據,需要維護一個長輪詢,針對這一點, Kafka 的消費者在消費數據時會傳入一個時長參數 timeout,如果當前沒有數據可供消費,Consumer 會等待一段時間之後再返回,使用這個機制解決了沒有新的數據消費者就在循環中空轉開銷問題。
-
分區分配策略
- 如果Consumer組中Consumer數量>Topic Partition數量根據Random-Robin策略(實際是個輪詢)或者Range(劃分範圍,一個範圍內的給到一個消費者)對Partition進行分配。此後這個消費者組中的消費者消費的Partition就被定下來了。所以在實際的應用中,建議消費者組的consumer的數量與partition的數量一致。
-
Offset 維護
- comsumer需要記錄已經消費到的偏移量以便故障或者後續繼續消費。在0.9版本後Kafka將這些資訊都保存在一個內置的Topic(
__comsumer_offset
),默認5s自動提交一次,而此前的版本是保存在ZK中的,這樣改進目的是:- 優化效率,減輕ZK壓力。
- 可以自己實現偏移量維護。
- comsumer需要記錄已經消費到的偏移量以便故障或者後續繼續消費。在0.9版本後Kafka將這些資訊都保存在一個內置的Topic(
Kafka發送數據的流程和消息結構?
- Producer在寫入數據的時候永遠的找leader,不會直接將數據寫入follower,發送的流程如下圖所示。
- 消息寫入leader後,follower是主動的去leader進行同步的。producer採用push模式將數據發布到broker,每條消息追加到分區中,順序寫入磁碟,所以保證同一分區內的數據是有序的。
- Message結構:上面說到log文件就實際是存儲message的地方,我們在producer往kafka寫入的也是一條一條的message,將生產者發送數據封裝為ProducerRecord對象,包括topic、partition、時間戳、key、value、header等;封裝消息結構包含消息體、消息大小、offset、壓縮類型等。
- offset:offset是一個佔8byte的有序id號,它可以唯一確定每條消息在parition內的位置。
- 消息大小:消息大小佔用4byte,用於描述消息的大小。
- 消息體:消息體存放的是實際的消息數據(被壓縮過),佔用的空間根據具體的消息而異。
Kafka數據分區的目的和原則?
- 數據會寫入到不同的分區,分區的主要目的是:
- 增加讀寫的吞吐量。
- 方便擴展,因為一個topic可以有多個partition,所以我們可以通過擴展機器去輕鬆的應對日益增長的數據量。
- 提高並發,以partition為讀寫單位,可以多個消費者同時消費數據,提高了消息的處理效率。
- 分區原則:如果某個topic有多個partition,kafka的producer選擇分區情況如下:
- partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
- 如果沒有指定partition,但是設置了數據的key,則會根據key的值hash出一個partition。
- 如果既沒指定partition,又沒有設置key,則會通過random-robin演算法輪詢得到分區(第一個得到一個隨機數,後續的在此基礎上自增)。
Kafka如何保證消息不丟失或者可靠性?
為保證 Producer 發送的數據,能可靠地發送到指定的 Topic,Topic 的每個 Partition 收到 Producer 發送的數據後,都需要向 Producer 發送 ACK 確認收到,如果 Producer 收到 ACK,就會進行下一輪的發送,否則重新發送數據。
-
副本數據同步策略
- 何時發送 ACK?確保有 Follower 與 Leader 同步完成,Leader 再發送 ACK,這樣才能保證 Leader 掛掉之後,能在 Follower 中選舉出新的 Leader 而不丟數據。
- 多少個 Follower 同步完成後發送 ACK?全部 Follower 同步完成,再發送 ACK。
-
ISR機制
- 如果採用第二種方案,所有 Follower 完成同步,Producer 才能繼續發送數據,設想有一個 Follower 因為某種原因出現故障,那 Leader 就要一直等到它完成同步。這個問題怎麼解決?
- Leader維護了一個動態的 in-sync replica set(ISR,同步副本的作用),只需要這個列表的中的follower和leader同步,當ISR中的follower完成數據的同步之後,leader就會給生產者發送ack。
- 當 ISR 集合中的 Follower 完成數據的同步之後,Leader 就會給 Follower 發送 ACK。
- 如果 Follower 長時間未向 Leader 同步數據,則該 Follower 將被踢出 ISR 集合,該時間閾值由 replica.lag.time.max.ms 參數設定。Leader 發生故障後,就會從 ISR 中選舉出新的 Leader。
- Leader和follower(ISR)落盤才會返回ack,會有數據重複現象,如果在leader已經寫完成,且follower同步完成,但是在返回ack的出現故障,則會出現數據重複現象;極限情況下,這個也會有數據丟失的情況,比如follower和leader通訊都很慢,所以ISR中只有一個leader節點,這個時候,leader完成落盤,就會返回ack,如果此時leader故障後,就會導致丟失數據
- 如果採用第二種方案,所有 Follower 完成同步,Producer 才能繼續發送數據,設想有一個 Follower 因為某種原因出現故障,那 Leader 就要一直等到它完成同步。這個問題怎麼解決?
-
ACK 應答機制
- 對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等 ISR 中的 Follower 全部接受成功。
- 為保證生產者發送的數據能可靠的發送到指定的topic,Kafka在Topic的每個partition收到生產者發送的數據後,都需要向生產者發送ack(確認收到),如果生產者收到ack,就會進行下一輪的發送,否則重新發送數據。也即是核心是通過ACK應答機制,在生產者向隊列寫入數據的時候可以設置參數來確認是否kafka接收到數據,這個參數可設置的值為如下:
- 0代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效率最高。
- 1代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
- -1代表all代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保leader發送成功和所有的副本都完成備份。安全性最高,但是效率最低,這裡的all是指ISR列表中。
-
如果往不存在的topic寫數據,kafka會自動創建topic,分區和副本的數量根據默認配置都是1。
-
生產者:同步發送,或者通過發送的callback來實現,producer.send(msg,callback)。
-
消費者:需要考慮先更新offset還是先做消費處理,先做消費則可能引出重複消費。
Kafka Broker節點故障時處理細節?
- 首先了解下LEO(每個副本最大的 Offset)和HW(消費者能見到的最大的 Offset,ISR 隊列中最小的 LEO)。
- Follower 故障:Follower 發生故障後會被臨時踢出 ISR 集合,待該 Follower 恢復後,Follower 會 讀取本地磁碟記錄的上次的 HW(high watermark,ISR中所有副本中結尾offset的最小值),並將 log 文件高於 HW 的部分截取掉,從 HW 開始向 Leader 進行同步數據操作。等該 Follower 的 LEO 大於等於該 Partition 的 HW,即 Follower 追上 Leader 後,就可以重新加入 ISR 了。
- Leader 故障:Leader 發生故障後,會從 ISR 中選出一個新的 Leader,之後,為保證多個副本之間的數據一致性,其餘的 Follower 會先將各自的 log 文件高於 HW 的部分數據截掉,重新向新的leader同步。
- 注意:這隻能保證副本之間的數據一致性,並不能保證數據不丟失或者不重複。
Kafka如何保證消息不重複消費?
- 將伺服器的 ACK 級別設置為 -1,可以保證 Producer 到 Server 之間不會丟失數據,即 At Least Once 語義,但是會出現數據重複(at least once)。
- 將伺服器 ACK 級別設置為 0,可以保證生產者每條消息只會被發送一次,即 At Most Once 語義,但是不能保證數據不丟失(at most once)。
- 對於一些非常重要的資訊,比如交易數據,下游數據消費者要求數據既不重複也不丟失,即 Exactly Once 語義。在0.11版本後,引入冪等性解決kakfa集群內部的數據重複,在0.11版本之前,在消費者處自己做處理:Producer 不論向 Server 發送多少重複數據,Server 端都只會持久化一條。即:At Least Once + 冪等性 = Exactly Once。
- 如果啟用了冪等性,只需要將 Producer 的參數中 enable.idompotence 設置為 true 即可,且ack默認就是-1,kafka就會為每個生產者分配一個pid,並為每條消息分配Sequence Number,如果pid、partition、seqnumber三者一樣,也即是對 <PID,Partition,SeqNumber> 做快取,當具有相同主鍵的消息提交時,Broker kafka認為是重複數據只會持久化一條;但是如果生產者掛掉後,也會出現有數據重複的現象;所以冪等性解決在單次會話的單個分區的數據重複,但是在分區間或者跨會話的是數據重複的是無法解決的。
Kafka如何保證消費數據的一致性?
- 消費數據的一致性主要通過HW來保證。
- LEO:指每個follower的最大的offset。
- HW(高水位):指消費者能見到的最大的offset,LSR隊列中最小的LEO,也就是說消費者只能看到1~6的數據,後面的數據看不到,也消費不了,避免leader掛掉後,比如當前消費者消費8這條數據後,leader掛了,此時比如f2成為leader,f2根本就沒有9這條數據,那麼消費者就會報錯,所以設計了HW這個參數,只暴露最少的數據給消費者,避免上面的問題。
Kafka如何處理消息積壓?
增加新topic,增大分區,將原來topic數據消費轉移到這個新topic,然後開多個消費者去處理新topic。
Kafka如何保證消息順序消費?
- Kafka不能保證消息的全局有序,只能保證消息在partition內有序,因為消費者消費消息是在不同的partition中隨機的。如要保證可以使用一個topic、一個partition、一個消費者且內部單執行緒處理實現消息順序消費,但一般不建議這樣做消費性能較低。
- 另外思路是採用N個queue,將相同key的數據寫入同一個queue,N個執行緒每個執行緒處理一個queue。
說說Kafka的事務?
Kafka事務在0.11版本後引入,主要解決的是Producer在Exactly Once語義上跨分區跨會話的精準一次寫入,要麼成功要麼失敗。
-
Producer事務(斷點續傳)
- 為了實現跨分區跨會話的事務,每個Producer需要一個全局唯一的Transaction ID,並將Producer獲得的PID和Transaction ID綁定。這樣當Producer重啟後就可以通過正在進行的Transaction ID獲得原來的PID。
- 為了管理Transaction,Kafka引入了一個新的組件Transaction Coordinator(事務調度器)。Producer就是通過和Transaction Coordinator交互獲得綁定的PID和對應的任務狀態。Transaction Coordinator還負責將事務所有寫入Kafka的一個內部Topic,這樣即使整個服務重啟,由於事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行。
-
注意:Kakfa事務回滾不會直接去刪除消息,而是將消息對Consumer不可見。
-
Consumer事務
- Kafka對Consumer的事務較弱,一般是通過Consumer端自己實現精確一次性消費(將消費過程和提交offset作為一個原子操作實現)。
Kafka如何通過ZooKeeper來進行選舉和狀態更新?
- 首先Kafka集群啟動時,會從Broker中選舉一個Controller(分散式鎖實現搶先創建臨時節點的broker當選),負責管理集群所有Broker的上下線(監聽zk的/brokers/ids/節點)、所有Topic分區副本分配、leader選舉等工作。
- 當某個Broker掛了以後,Controller監聽到臨時節點/brokers/ids/中的變化,從ZK各個分區狀態資訊中獲取ISR(此時去除了掛掉節點所有的Partition,失去leader的Partition重新選舉leader),並完成ZooKeeper各個分區狀態更新。
Kafka高效讀寫的保證?
-
Kafka高性能設計包括多分區、順序讀寫、page cache、預讀、記憶體映射-零拷貝、無鎖offset 管理機制、NIO、壓縮、批量讀寫、高性能序列化(二進位)等。
-
順序讀寫:如同HBase順序寫HFile文件一樣,Kafka順序寫log文件寫入磁碟效率極高(據Kafka官網文檔說比隨機寫快6000倍)。
-
使用作業系統的Page Cache來快取要寫入的數據,好處在於:
- 寫入前可以做一些優化,提高磁碟寫入性能。
- 快取也可以用於數據被讀取,當數據寫入與讀取速率相近的情況下,可以直接記憶體讀取。
- Page Cache非JVM記憶體,不會影響JVM,導致GC的增加。同時,Kafka節點宕機,數據還在此機器快取。
-
零拷貝機制:
- 正常情況下,先把數據讀到內核空間,在從內核空間把數據讀到用戶空間,然後在調作業系統的io介面寫到內核空間,最終在寫到硬碟中。磁碟 -> 內核page cache -> 應用快取 -> 內核page cache -> 磁碟的過程。
- 如果數據只是單純的拷貝而不需要修改,那麼拷貝到應用快取的步驟完全是多餘的。所以Kafka利用了作業系統提供的零拷貝機制,來減少不需要的系統調用和數據拷貝次數,直接在內核空間流轉io流,所以kafka的性能非常高。
本篇主要是以Kafka的基礎理論、架構原理和面試為主,後續我們再分享Kafka有關 API 以及事務、攔截器、監控等高級篇,已達到Kafka實戰編程和應用的目的。