kafka 上手指南:單節點
- 2019 年 11 月 6 日
- 筆記
大家好,我叫謝偉,是一名程式設計師。
今天的主題:kafka 使用指南,單節點版本。
1. 使用場景
如果你是一名後端工程師,設計的應用正常的線上運行,某次秒殺活動,突然間把系統搞崩了,排查系統發現很多的流量沒有處理,導致系統掛了,這個時候有兩種思路: 1. nginx 反向代理,把更多的請求轉發給內部網路的伺服器上進行處理,達到一個負載均衡的目的 2. 使用消息系統,將更多的請求使用中間件「快取」起來,再從這個系統中不斷的取到快取的請求,進行進一步的處理。
後者使用到的消息系統,就是kafka 的一個使用場景。
那麼什麼是 kafka?
kafka 是一個分散式消息系統,目前已定位為分散式流式處理平台。
簡單的說一個系統A 將消息發給消息系統,一個系統B 再從消息系統中取到消息,進行後續的處理。
常見的用來描述 kafka 應用場景的一個詞是:削峰填谷,削減波峰流量,填充波谷流量,使系統盡量的平滑。
由此得處:kafka 的三個典型應用場景
- 消息系統
- 存儲系統
- 分散式流式處理平台
消息系統是目前最廣泛的應用;消息傳輸需要存儲起來,供後續系統拉取,故也可以當作存儲系統;拉取消息之後,其實也是供後續系統處理,那麼為什麼不把數據處理也包含再kafka 系統中?分散式流式處理平台,大概就是這個意思。
下文陳述最核心的應用:消息系統
2. 基本概念
一條消息由系統A 產生,發往消息系統,系統B 從消息系統中拉取,這其中涉及到很多的概念。
- 系統A 稱為生產者 producer,目的是發送消息
- 消息系統稱為 broker,本質是服務進程目的是接受生產者的消息、消費者的消息拉取請求、持久化
- 系統B 稱為消費者 consumer, 目的是拉取消息系統中的消息
針對生產者、消費者有不同的設置參數,決定了生產者、消費者的不同行為。
生產者要發送消息,首先要知道發往何處,即要知道 broker 的地址,知道 broker 的地址,broker(kafka server) 的設置約束了持久化存儲的地址及其他行為,除此之外,如何區分發的消息的類型不同呢?kafka 系統給這個區分消息的概念取了個邏輯概念:Topic , 即生產者指定的 Topic 不同,存儲的地址就不同。
針對 Topic,簡單的場景是,不斷的往裡面發內容,持久化存儲就不斷以追加的模式存儲,簡單場景沒什麼問題,問題是消息數據過多的話,不利於系統消費,很簡單的想法,分不同的「文件」追加存儲,把整體規模縮小,這個概念在 kafka 中稱之為分區:partition. 消息可以不斷的以追加的模式不斷的發往分區內,分區有編號,起始位 0 ,消息追加模式存儲在分區內,會給一個編號 offset
消費者從 broker 系統中拉取消息,首先要知道broker 地址,其次需要知道 Topic,更細化的還可以設置哪個分區,哪個偏移量 offset 開始,消費消息。
那消息萬一丟了咋整?一個簡單的做法就是冗餘備份:Replication,多份備份,其中有一個是 Leader , 其他的是 follower, leader 的作用是和消息對接,follower 不直接和消息對接,只負責和 leader 對接,不斷的同步數據。
多個 broker 構成 kafka 集群,萬一一個掛了 kafka 系統依靠 zookeeper 進行重新選舉產生新leader。
kafka cluster:
image
kafka topic: 分區概念
image
kafka 集群:
image
3. 客戶端使用
基於上述概念:那麼如何構建一個Kafka 服務,完成消息系統呢?
- 啟動服務進程:broker
偽程式碼:
type Broker struct{ Addr Config ... }
- 生產者連接 broker
偽程式碼:
type Producer struct{ Config Message ... }
- 消費者連接 broker
偽程式碼
type Consumer strcut{ Config Topic Partitions Offset ... }
基本的思路:
- 啟動kafka服務
- 系統A 連接服務,發送消息
- 系統B 連接服務,消費消息
結合官網的示例:如何完成最基本的消息收發。
下載安裝包:kafka_2.12-2.3.0.tgz
- 2.12 指編譯器版本
- 2.3.0 指kafka 版本
解壓之後,最重要的有兩目錄:
- bin : 一系列的腳本,比如啟動 zookeeper 服務,創建 topic,生產者生產消息,消費者消費消息等
zookeeper-server-start.sh zookeeper-server-stop.sh kafka-configs.sh kafka-console-consumer.sh kafka-console-producer.sh kafka-consumer-groups.sh kafka-topics.sh kafka-server-start.sh kafka-server-stop.sh ...
- config: 配置文件:比如配置 zookeeper 埠,配置kafka 日誌存儲目錄、對外埠,消息最大容量,保存時常等
zookeeper.properties server.properties producer.properties consumer.properties ...
大概200多個參數吧,不好意思,我記不住。那怎麼辦?不學了嗎,那掙不了錢,漲不了工資啊。
基本默認設置,部分按分類設置:
- zookeeper.properties
kafka 依賴於 zookeeper 分散式協調
dataDir=/tmp/zookeeper clientPort=2181
記住這個默認的 clientPort=2181
- server.properties
kafka server 服務
log.dirs=/tmp/kafka-logs //日誌存儲目錄 log.retention.hours=168 // 日誌存儲時長 broker.id=0 // 默認 broker id,集群方式的 kafka 設置,給每個 broker 編號 listeners=PLAINTEXT://:9092 // 對外提供的服務入口地址 zookeeper.connect=localhost:2181 // ZooKeeper集群地址 ...
- producer.properties
約定消息等的內容
- consumer.properties
約定消費消息等的內容
配置好配置參數後:
- 啟動 zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
- 啟動 kafka 服務進程
> bin/kafka-server-start.sh config/server.properties
創建topic, 查詢 topic 等可以使用:kafka-topics.sh
生產者生產消息可以使用:kafka-console-producer.sh
消費者消費消息可以使用:kafka-console-consumer.sh
當然,這些操作,一般只供測試使用,實際的使用是使用對應變成語言的客戶端。
4. 演示
kafka go版本客戶端:
下載安裝:
go get -u -v github.com/Shopify/sarama
4.1 生產者
系統 A
- 生產者
type KafkaAction struct { DataSyncProducer sarama.SyncProducer DataAsyncProducer sarama.AsyncProducer }
// 同步方式 func newDataSyncProducer(brokerList []string) sarama.SyncProducer { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message config.Producer.Retry.Max = 5 // Retry up to 10 times to produce the message config.Producer.Return.Successes = true config.Producer.Partitioner = sarama.NewRoundRobinPartitioner producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { log.Fatalln("Failed to start Sarama producer1:", err) } return producer }
// 非同步方式 func newDataAsyncProducer(brokerList []string) sarama.AsyncProducer { config := sarama.NewConfig() sarama.Logger = log.New(os.Stdout, "[KAFKA] ", log.LstdFlags) config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack config.Producer.Compression = sarama.CompressionSnappy // Compress messages config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms config.Producer.Partitioner = sarama.NewRoundRobinPartitioner producer, err := sarama.NewAsyncProducer(brokerList, config) if err != nil { log.Fatalln("Failed to start Sarama producer2:", err) } go func() { for err := range producer.Errors() { log.Println("Failed to write access log entry:", err) } }() return producer }
還記得生產者有一系列配置參數嗎?config 就這這個作用,有默認值,可以自己設置對應的值。
比如:壓縮演算法
config.Producer.Compression = sarama.CompressionSnappy
常用的壓縮演算法有:
- gzip
- snappy
- lz4
- zstd
不同的壓縮演算法主要在壓縮比和吞吐量不同。
比如分區規則
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
常用的分區規則:
- 輪詢機制
- 隨機分區
- 按 key 分區
比如:發送消息是否返回成功與否
onfig.Producer.RequiredAcks = sarama.WaitForLocal
- 消息:生產者只傳遞位元組組數據。
介面
type Encoder interface { Encode() ([]byte, error) Length() int }
發送的消息需要實現Encoder 介面,即定義的消息結構體需要實現 Encode 和 Length 方法。
type SendMessage struct { Method string `json:"method"` URL string `json:"url"` Value string `json:"value"` Date string `json:"date"` encoded []byte err error } func (S *SendMessage) Length() int { b, e := json.Marshal(S) S.encoded = b S.err = e return len(string(b)) } func (S *SendMessage) Encode() ([]byte, error) { return S.encoded, S.err }
- 發送消息
func (K *KafkaAction) Do(v interface{}) { message := v.(SendMessage) // 發送的消息返回分區和偏移量 partition, offset, err := K.DataSyncProducer.SendMessage(&sarama.ProducerMessage{ Topic: TOPIC, Value: &message, }) if err != nil { log.Println(err) return } value := map[string]string{ "method": message.Method, "url": message.URL, "value": message.Value, "date": message.Date, } fmt.Println(fmt.Sprintf("/%d/%d/%+v", partition, offset, value)) }
比如我們按照上面的配置發送消息:topic: topic-golang partition/offset/value
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/2/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/3/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
上文只有一個 partition , offset值不斷增加。
創建另外一個 topic, 分10個區。topic: topic-python
在日誌中顯示成咋樣的呢?
// cd log.dirs ; server.properties 中的設置 topic-golang-0 topic-python-0 topic-python-1 topic-python-2 topic-python-3 topic-python-4 topic-python-5 topic-python-6 topic-python-7 topic-python-8 topic-python-9
往 topic-python 中發送日誌,分區規則輪詢:
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
輪詢,不斷的往分區記憶體消息。
4.2 消費者
系統 B
func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true brokers := []string{"127.0.0.1:9092"} master, err := sarama.NewConsumer(brokers, config) if err != nil { panic(err) } defer func() { if err := master.Close(); err != nil { panic(err) } }() _, e := master.Partitions("topic-python") if e != nil { log.Println(e) } consumer, err := master.ConsumePartition("topic-python", 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): fmt.Println("Received messages", string(msg.Key), string(msg.Value), msg.Topic) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh }
- 消費者指定了 topic: topic-python
- 消費者指定了 partition: 0
還記得生產者向 topic-python 內發送的消息嗎? partition/offset/value
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
可以看出:partition: 0 中有兩條消息。那麼消費者指定了分區,只能消費這兩條消息。
Received messages {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python Received messages {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python
4.3 其他
使用 kafka 客戶端 ,那麼我們還需要哪些功能?
- 關於 Topic 的創建、描述、刪除等
- 消費者組描述等
- 元資訊:metadata
type ClusterAdmin interface { CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error ListTopics() (map[string]TopicDetail, error) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) DeleteTopic(topic string) error CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error DeleteRecords(topic string, partitionOffsets map[int32]int64) error DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error CreateACL(resource Resource, acl Acl) error ListAcls(filter AclFilter) ([]ResourceAcls, error) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) ListConsumerGroups() (map[string]string, error) DescribeConsumerGroups(groups []string) ([]*GroupDescription, error) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) DeleteConsumerGroup(group string) error DescribeCluster() (brokers []*Broker, controllerID int32, err error) Close() error }
關於單節點 kafka 的基本應用就這些。
5. 容器服務
任何提供服務的系統,都可以使用容器版本,kafka 也可以使用容器版本。配置可以使用環境變數的形式設置。
docker-compose.yml
version: '2' services: ui: image: index.docker.io/sheepkiller/kafka-manager:latest depends_on: - zookeeper ports: - 9000:9000 environment: ZK_HOSTS: zookeeper:2181 zookeeper: image: index.docker.io/wurstmeister/zookeeper:latest ports: - 2181:2181 server: image: index.docker.io/wurstmeister/kafka:latest depends_on: - zookeeper ports: - 9092:9092 environment: KAFKA_OFFSETS_TOPIC_REPLIATION_FACTOR: 1 KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- zookeeper 分散式協調系統
- kafka server Kafka 服務
- kafka-manager kafka 管理平台
後續集群版本。
<完>
程式碼:https://github.com/wuxiaoxiaoshen/go-thirdparty