Linux安裝Kafka
- 2019 年 10 月 5 日
- 筆記
kafka是一個分散式消息隊列。具有高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列里寫消息,消費者從隊列里取消息進行業務邏輯。一般在架構設計中起到解耦、削峰、非同步處理的作用。kafka對外使用topic的概念,生產者往topic里寫消息,消費者從讀消息。為了做到水平擴展,一個topic實際是由多個partition組成的,遇到瓶頸時,可以通過增加partition的數量來進行橫向擴容。單個parition內是保證消息有序。每新寫一條消息,kafka就是在對應的文件append寫,所以性能非常高。
v基礎知識
什麼是消息隊列(Message Queue)?
消息(Message)
網路中的兩台電腦或者兩個通訊設備之間傳遞的數據。例如說:文本、音樂、影片等內容。
隊列(Queue)
一種特殊的線性表(數據元素首尾相接),特殊之處在於只允許在首部刪除元素和在尾部追加元素。入隊、出隊。
消息隊列(MQ)
消息+隊列,保存消息的隊列。消息的傳輸過程中的容器;主要提供生產、消費介面供外部調用做數據的存儲和獲取。
MQ分類
MQ主要分為兩類:點對點(p2p)、發布訂閱(Pub/Sub)
共同點:
消息生產者生產消息發送到queue中,然後消息消費者從queue中讀取並且消費消息。
不同點:
p2p模型包括:消息隊列(Queue)、發送者(Sender)、接收者(Receiver)
一個生產者生產的消息只有一個消費者(Consumer)(即一旦被消費,消息就不在消息隊列中)。比如說打電話。
Pub/Sub包含:消息隊列(Queue)、主題(Topic)、發布者(Publisher)、訂閱者(Subscriber)。每個消息可以有多個消費者,彼此互不影響。比如我發布一個微博:關注我的人都能夠看到。
那麼在大數據領域呢,為了滿足日益增長的數據量,也有一款可以滿足百萬級別消息的生成和消費,分散式、持久穩定的產品——Kafka。
vKafka概念
在要了解Kafka之前,必須先了解主題,經紀人,生產者和消費者等主要術語。 下圖說明了主要術語,表格詳細描述了圖表組件。如已了解的可以跳過此部分。
在上圖中,主題配置為三個分區。 分區1具有兩個偏移因子0和1.分區2具有四個偏移因子0,1,2和3.分區3具有一個偏移因子0.副本的id與承載它的伺服器的id相同。
假設,如果主題的複製因子設置為3,那麼Kafka將創建每個分區的3個相同的副本,並將它們放在集群中以使其可用於其所有操作。 為了平衡集群中的負載,每個代理都存儲一個或多個這些分區。 多個生產者和消費者可以同時發布和檢索消息。
Topics(主題):每條發布到Kafka集群的消息都有一個類別,這個類別被稱為topic。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存於何處)
Partition(分區):parition是物理上的概念,每個topic包含一個或多個partition,創建topic時可指定parition數量。每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件
Partition offset(分區偏移):每個分區消息具有稱為 offset 的唯一序列標識。
Replicas of partition(分區備份):副本只是一個分區的備份。 副本從不讀取或寫入數據。 它們用於防止數據丟失。
Broker:Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker
Brokers(經紀人):代理是負責維護髮布數據的簡單系統。 每個代理中的每個主題可以具有零個或多個分區。 假設,如果在一個主題和N個代理中有N個分區,每個代理將有一個分區。假設在一個主題中有N個分區並且多於N個代理(n + m),則第一個N代理將具有一個分區,並且下一個M代理將不具有用於該特定主題的任何分區。假設在一個主題中有N個分區並且小於N個代理(n-m),每個代理將在它們之間具有一個或多個分區共享。 由於代理之間的負載分布不相等,不推薦使用此方案。
Kafka Cluster(Kafka集群):Kafka有多個代理被稱為Kafka集群。 可以擴展Kafka集群,無需停機。 這些集群用於管理消息數據的持久性和複製。
Producers(生產者):生產者是發送給一個或多個Kafka主題的消息的發布者。 生產者向Kafka經紀人發送數據。 每當生產者將消息發布給代理時,代理只需將消息附加到最後一個段文件。 實際上,該消息將被附加到分區。 生產者還可以向他們選擇的分區發送消息。
Consumers(消費者):消費消息。每個consumer屬於一個特定的consumer group(可為每個consumer指定group name,若不指定group name則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。
Consumer Group(消費者組):是邏輯上的概念,是Kafka實現單播和廣播兩種消息模型的手段。同一個topic的數據,會廣播給不同的group;同一個group中的worker,只有一個worker能拿到這個數據。換句話說,對於同一個topic,每個group都可以拿到同樣的所有數據,但是數據進入group後只能被其中的一個worker消費。group內的worker可以使用多執行緒或多進程來實現,也可以將進程分散在多台機器上,worker的數量通常不超過partition的數量,且二者最好保持整數倍關係,因為Kafka在設計時假定了一個partition只能被一個worker消費(同一group內)。簡單的理解就是,實現了隊列的方式。同一個groupid 的 consumer 屬於一個隊列方式,消費了就完事了
Leader(領導者): Leader 是負責給定分區的所有讀取和寫入的節點。 每個分區都有一個伺服器充當Leader.
Follower(追隨者):跟隨領導者指令的節點被稱為Follower。 如果領導失敗,一個追隨者將自動成為新的領導者。 跟隨者作為正常消費者,拉取消息並更新其自己的數據存儲。
Kafka的特性:
- 可靠性:Kafka是分散式,分區,複製和容錯的。
- 可擴展性:Kafka消息傳遞系統輕鬆縮放,無需停機。
- 耐用性/持久性:Kafka使用分散式提交日誌,這意味著消息會儘可能快地保留在磁碟上,因此它是持久的。
- 性能:Kafka對於發布和訂閱消息都具有高吞吐量。 即使存儲了許多TB的消息,它也保持穩定的性能。
- 高並發:支援數千個客戶端同時讀寫
使用場景:
- 指標:Kafka通常用於操作監控數據。 這涉及聚合來自分散式應用程式的統計資訊,以產生操作數據的集中饋送。
- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分散式應用的數據,生產各種操作的集中回饋,比如報警和報告。
- 日誌聚合解決方案:Kafka可用於跨組織從多個服務收集日誌,並使它們以標準格式提供給多個伺服器。
- 消息系統:解耦和生產者和消費者、快取消息等。
- 流處理:流行的框架(如Storm和Spark Streaming)從主題中讀取數據,對其進行處理,並將處理後的數據寫入新主題,供用戶和應用程式使用。 Kafka的強耐久性在流處理的上下文中也非常有用。
安裝Kafka之前,先確認是否已安裝Java和Zookeeper
沒有安裝Java JDK的朋友可以直接看這裡。《CentOS安裝Java JDK》
沒有安裝Zookeeper的朋友可以直接看這裡。《安裝ZooKeeper》
v安裝Kafka
2.1 下載
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
如果下載很慢或者不方便,也可以用這裡已經下載好的壓縮包。鏈接: https://pan.baidu.com/s/1u8mSfubwZupFqKtK6PH6Qw 提取碼: v5em
2.2 解壓
tar -xzf kafka_2.12-2.0.0.tgz
注意,kafka_2.12-2.0.0.tgz版本是已經編譯好的版本,解壓就能使用。
2.3 配置server.properties
默認配置 advertised.listeners=PLAINTEXT://:your.host.name:9092
修改為 advertised.listeners=PLAINTEXT://:ip:9092
ip為伺服器ip。
hostname和埠是用來建議給生產者和消費者使用的,如果沒有設置,將會使用listeners的配置,如果listeners也沒有配置,將使用java.net.InetAddress.getCanonicalHostName()來獲取這個hostname和port,對於ipv4,基本就是localhost了。
“PLAINTEXT”表示協議,可選的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用”0.0.0.0″表示對所有的網路介面有效,如果hostname為空表示只對默認的網路介面有效。也就是說如果你沒有配置advertised.listeners,就使用listeners的配置通告給消息的生產者和消費者,這個過程是在生產者和消費者獲取源數據(metadata)。
更多介紹:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# ################################################################################## # broker就是一個kafka的部署實例,在一個kafka集群中,每一台kafka都要有一個broker.id # 並且,該id唯一,且必須為整數 ################################################################################## broker.id=10 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 ################################################################################## #The number of threads handling network requests # 默認處理網路請求的執行緒個數 3個 ################################################################################## num.network.threads=3 ################################################################################## # The number of threads doing disk I/O # 執行磁碟IO操作的默認執行緒個數 8 ################################################################################## num.io.threads=8 ################################################################################## # The send buffer (SO_SNDBUF) used by the socket server # socket服務使用的進行發送數據的緩衝區大小,默認100kb ################################################################################## socket.send.buffer.bytes=102400 ################################################################################## # The receive buffer (SO_SNDBUF) used by the socket server # socket服務使用的進行接受數據的緩衝區大小,默認100kb ################################################################################## socket.receive.buffer.bytes=102400 ################################################################################## # The maximum size of a request that the socket server will accept (protection against OOM) # socket服務所能夠接受的最大的請求量,防止出現OOM(Out of memory)記憶體溢出,默認值為:100m # (應該是socker server所能接受的一個請求的最大大小,默認為100M) ################################################################################## socket.request.max.bytes=104857600 ############################# Log Basics (數據相關部分,kafka的數據稱為log)############################# ################################################################################## # A comma seperated list of directories under which to store log files # 一個用逗號分隔的目錄列表,用於存儲kafka接受到的數據 ################################################################################## log.dirs=/home/uplooking/data/kafka ################################################################################## # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. # 每一個topic所對應的log的partition分區數目,默認1個。更多的partition數目會提高消費 # 並行度,但是也會導致在kafka集群中有更多的文件進行傳輸 # (partition就是分散式存儲,相當於是把一份數據分開幾份來進行存儲,即劃分塊、劃分分區的意思) ################################################################################## num.partitions=1 ################################################################################## # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. # 每一個數據目錄用於在啟動kafka時恢複數據和在關閉時刷新數據的執行緒個數。如果kafka數據存儲在磁碟陣列中 # 建議此值可以調整更大。 ################################################################################## num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy (數據刷新策略)############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs(平衡) here: # 1. Durability 持久性: Unflushed data may be lost if you are not using replication. # 2. Latency 延時性: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput 吞吐量: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # kafka中只有基於消息條數和時間間隔數來制定數據刷新策略,而沒有大小的選項,這兩個選項可以選擇配置一個 # 當然也可以兩個都配置,默認情況下兩個都配置,配置如下。 # The number of messages to accept before forcing a flush of data to disk # 消息刷新到磁碟中的消息條數閾值 #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush # 消息刷新到磁碟生成一個log數據文件的時間間隔 #log.flush.interval.ms=1000 ############################# Log Retention Policy(數據保留策略) ############################# # The following configurations control the disposal(清理) of log segments(分片). The policy can # be set to delete segments after a period of time, or after a given size has accumulated(累積). # A segment will be deleted whenever(無論什麼時間) *either* of these criteria(標準) are met. Deletion always happens # from the end of the log. # 下面的配置用於控制數據片段的清理,只要滿足其中一個策略(基於時間或基於大小),分片就會被刪除 # The minimum age of a log file to be eligible for deletion # 基於時間的策略,刪除日誌數據的時間,默認保存7天 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. 1G # 基於大小的策略,1G #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. # 數據分片策略 log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies 5分鐘 # 每隔多長時間檢測數據是否達到刪除條件 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
v啟動Kafka
3.1 啟動ZooKeeper
/usr/local/zookeeper-3.4.13/bin/zkServer.sh start
注意,需要先啟動ZooKeeper再啟動kafka,不然會報錯。如下圖:
3.2 啟動kafka
bin/kafka-server-start.sh config/server.properties
啟動Kafka Broker後,在ZooKeeper終端上鍵入命令 jps,效果如下:
3.2 停止kafka
bin/kafka-server-stop.sh config/server.properties
vKafka topic
4.1 創建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
其中demo為創建的topic名稱。
如上圖,創建了一個名為 demo 的主題,其中包含一個分區和一個副本因子。 創建成功之後會輸出: Created topic "demo".
如上圖,創建主題後,系統會在config / server.properties文件中的”/ tmp / kafka-logs /”中指定的創建主題的日誌。
4.2 查詢topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.3 查看topic資訊
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo
4.3 刪除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic demo
vKafka 生產/消費
5.1 啟動生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
從上面的語法,生產者命令行客戶端需要兩個主要參數 –
代理列表 – 我們要發送郵件的代理列表。 在這種情況下,我們只有一個代理。 Config / server.properties文件包含代理埠ID,因為我們知道我們的代理正在偵聽埠9092,因此您可以直接指定它。主題名稱:demo。
5.2 啟動消費者
為了方便測試,另啟一個sheel窗口 這樣效果更明顯。需要注意的是舊版本和新版本的命令是不一樣的
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo --from-beginning
報錯提示: zookeeper is not a recognized option
發現在啟動的時候說使用 –zookeeper是一個過時的方法,最新的版本中命令如下:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
可以開啟兩個終端,一個發送消息,一個接受消息。效果如下:
vKafka 部落格總結
Kafka是一個統一的平台,用於處理所有實時數據Feed。 Kafka支援低延遲消息傳遞,並在出現機器故障時提供對容錯的保證。 它具有處理大量不同消費者的能力。 Kafka非常快,執行2百萬寫/秒。 Kafka將所有數據保存到磁碟,這實質上意味著所有寫入都會進入作業系統(RAM)的頁面快取。 這使得將數據從頁面快取傳輸到網路套接字非常有效。
本文中部分內容翻譯或借鑒於以下學習資料,特別鳴謝:
- Quickstart kafka
- Kafka中文文檔
- How to Set Up Kafka
- Kafka 概述
- linux安裝kafka
- How to install kafka on RHEL 8
- linux下安裝kafka
作 者:請叫我頭頭哥
出 處:http://www.cnblogs.com/toutou/
關於作者:專註於基礎平台的項目開發。如有問題或建議,請多多賜教!
版權聲明:本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評論和私信都會在第一時間回復。也歡迎園子的大大們指正錯誤,共同進步。或者直接私信我
聲援部落客:如果您覺得文章對您有幫助,可以點擊文章右下角【推薦】一下。您的鼓勵是作者堅持原創和持續寫作的最大動力!