Confluent之Kafka Connector初體驗
概述
背景
Apache Kafka 是最大、最成功的開源項目之一,可以說是無人不知無人不曉,在前面的文章《Apache Kafka分散式流處理平台及大廠面試寶典》我們也充分認識了Kafka,Apache Kafka 是LinkedIn 開發並開源的,LinkedIn 核心理念之一數據驅動主要有兩點領悟,其一是強調動態數據有效性要遠遠大於靜態數據,何為動態數據和靜態數據,可以簡單理解靜態數據則為我們目前基於各種各樣的資料庫或文件系統等存儲系統,而動態數據是基於事件驅動的理念如現在主流Apache Kafka和Apache Pulsar。其二是要利用所有數據化資訊而不僅僅是交易核心數據,非交易核心數據量比交易核心數據量往往要大100倍都不止。作為當時開發出Apache Kafka實時消息流技術的幾個團隊成員出來創業,其中Jay Kreps帶頭創立了新公司Confluent,還有饒軍也是Confluent聯合創始人之一,而Confluent的產品基本都是圍繞著Kafka來做的。基於動態數據理念是Apache Kafka 到Confluent企業化之路的主引線。
Confluent公司
Confluent 開創了一個新的數據基礎設施,專註於他們動態數據 —— 本質上是數據流(流動中的數據),適合實時數據,公司的使命是「讓數據動起來」。Confluent 以 Kafka 為核心,是將其商業化最成功的領先獨立公司之一。對數據的集成和連接進行實時操作,如果要真正利用Kafka,使用Confluent是目前最佳事先方案。 官網主要產品如下
而核心的商業化產品為Confluent Platform(可以在本地進行部署)和Confluent Cloud(託管在雲中,並在混合雲基礎設施的環境中工作)。
- Confluent Platform是一個流數據平台,實現數據流集成,能夠組織管理來自不同數據源的數據,擁有穩定高效的系統。Confluent Platform 很容易的建立實時數據管道和流應用。通過將多個來源和位置的數據集成到公司一個中央數據流平台,Confluent Platform使您可以專註於如何從數據中獲得商業價值而不是擔心底層機制,如數據是如何被運輸或不同系統間摩擦。具體來說,Confluent Platform簡化了連接數據源到Kafka,用Kafka構建應用程式,以及安全,監控和管理您的Kafka的基礎設施。
Confluent Platform又分為三塊
提供了KafkaSteams和KsqlDB企業級Kafka應用特性
Confluent三要素
- 完整:利用開源 Apache Kafka 的功能和我們重要的專有功能,為動態數據創建了一個完整的平台。使用特定工具(例如 ksqlDB)並行移動和處理數據,ksqlDB 是一種原生的動態數據資料庫,允許用戶僅使用幾條 SQL 語句以及 100 多個連接器來構建動態數據應用程式。
- 無處不在:已經構建了一個真正的混合和多雲產品。可以在客戶的雲和多雲環境中為他們提供支援,本地,或兩者的結合。從一開始就認識到雲之旅不是一蹴而就的,使用戶有效地進行數字化轉型,基本的動態數據平台,可以在整個技術環境中無縫集成。
- 雲原生。 Confluent 為動態數據提供真正的雲功能。 提供完全託管的雲原生服務,該服務具有大規模可擴展性、彈性、安全性和全球互聯性,可實現敏捷開發。這與採用內部軟體並簡單地在雲虛擬機上提供它所產生的體驗完全不同。使用 Confluent,開發人員和企業都可以專註於他們的應用程式並推動價值,而無需擔心管理數據基礎設施的運營開銷。
存儲方式採用雲原生的架構,和Apache Pulsar一樣採用存儲和計算分離的架構,可以很方便實現分散式擴容,存儲資源不夠單獨擴容存儲,計算資源不夠單獨擴容計算。
Kafka Connect
概述
Kafka Connect是Apache Kafka的一個組件,用於執行Kafka和其他系統之間的流集成,比如作為用來將Kafka與資料庫、key-value存儲系統、搜索系統、文件系統等外部系統連接起來的基礎框架。通過使用Kafka Connect框架以及現有的連接器可以實現從源數據讀入消息到Kafka,再從Kafka讀出消息到目的地的功能。Kafka Connect可以很容易地將數據從多個數據源流到Kafka,並將數據從Kafka流到多個目標。Kafka Connect有上百種不同的連接器。Confluent 在 Kafka connect基礎上實現了上百種不同的connector連接器免費讓大家使用,而且Confluent
在GitHub
上提供了源碼,可以根據自身業務需求進行修改。其中最流行的有:
- RDBMS (Oracle, SQL Server, DB2, Postgres, MySQL)
- Cloud Object stores (Amazon S3, Azure Blob Storage, Google Cloud Storage)
- Message queues (ActiveMQ, IBM MQ, RabbitMQ)
- NoSQL and document stores (Elasticsearch, MongoDB, Cassandra)
- Cloud data warehouses (Snowflake, Google BigQuery, Amazon Redshift)
可以自己運行Kafka Connect,或者利用Confluent Cloud中提供的眾多託管連接器來提供一個完全基於雲的集成解決方案。除了託管連接器外,Confluent還提供了完全託管的Apache Kafka、Schema Registry和ksqlDB。
- Schema-Registry是為元數據管理提供的服務,同樣提供了RESTful介面用來存儲和獲取schemas,它能夠保存數據格式變化的所有版本,並可以做到向下兼容。Schema-Registry還為Kafka提供了Avro格式的序列化插件來傳輸消息。Confluent主要用Schema-Registry來對數據schema進行管理和序列化操作。
Kafka Connect使用
Kafka Connect運行在自己的進程中,獨立於Kafka broker。它是分散式的、可伸縮的、容錯的,就像Kafka本身一樣。使用Kafka Connect不需要編程,因為它只由JSON配置驅動。這使得它可以被廣泛的用戶使用。除了接收和輸出數據,Kafka Connect還可以在數據通過時執行輕量級的轉換。
Kafka Connect把數據從另一個系統流到Kafka,或者把數據從Kafka流到其他地方,下面是一些使用Kafka Connect的常見方式:
- 流數據管道:Kafka Connect可以用來從一個源(如事務資料庫)獲取實時事件流,並將其流到目標系統進行分析。因為Kafka存儲每個數據實體(topic)的可配置時間間隔,所以可以將相同的原始數據流到多個目標。這可以是為不同的業務需求使用不同的技術,也可以是將相同的數據提供給業務中的不同領域(業務中有自己的系統來保存數據)。
- 從應用程式寫入數據存儲:在應用程式中,可以創建想要寫入目標系統的數據。可以是寫入文檔存儲的一系列日誌事件或是持久化到關係資料庫的數據。通過將數據寫入Kafka,並使用Kafka Connect負責將數據寫入目標達到簡化記憶體佔用。
- 從舊系統到新系統的演化過程:在NoSQL存儲、事件流平台和微服務等最新技術出現之前,關係資料庫(RDBMS)實際上是應用程式中所有數據寫入的地方。RDBMS在我們構建的系統中仍然扮演著非常重要的角色——但並不總是如此。有時我們會想使用Kafka作為獨立服務之間的消息代理以及永久的記錄系統。這兩種方法非常不同,但與過去的技術變革不同,兩者之間是無縫銜接的。通過使用變更數據捕獲(CDC),我們可以近乎實時地從資料庫中提取每一個INSERT, UPDATE,甚至DELETE到Kafka事件流中。CDC對源資料庫的影響非常小,這意味著現有應用程式可以繼續運行(並且不需要對其進行更改),同時可以由從資料庫捕獲的事件流驅動構建新的應用程式。當原始的應用程式在資料庫中記錄一些東西時(例如,下單),任何在Kafka中訂閱事件流的應用程式都將能夠基於這些事件採取行動(例如,一個新的訂單服務)
- 使現有的系統處理實時:許多組織在他們的資料庫中都有靜止的數據,比如Postgres, MySQL或Oracle,並且可以使用Kafka Connect從現有的數據中獲取價值,並將其轉換為事件流並實現數據驅動分析。
任何Kafka Connect管道的關鍵組件都是連接器。Kafka Connect由社區、供應商編寫,或者偶爾由用戶訂製編寫,它將Kafka Connect與特定的技術集成在一起。例如:
- Debezium MySQL源連接器使用MySQL bin日誌從資料庫讀取事件,並將這些事件流到Kafka Connect。
- Elasticsearch接收器連接器從Kafka Connect獲取數據,並使用Elasticsearch api,將數據寫入Elasticsearch。
- Confluent的S3連接器既可以作為源連接器,也可以作為接收連接器,將數據寫入S3或將其讀入。
Kafka Connect部署方式
Connector和任務是邏輯工作單元,並作為流程運行。這個進程在Kafka Connect中被稱為worker。Kafka Connect worker可以在兩種部署方式中運行:獨立的或分散式的。
- 布式模式(推薦):在多台機器(節點)上連接工人。它們形成一個Connect集群。Kafka Connect在集群中分布正在運行的連接器。您可以根據需要添加或刪除更多節點。連接器是通過Kafka Connect提供的REST API創建和管理的。可以容易地添加額外的worker,從Kafka Connect集群中添加worker時,任務會在可用的worker之間重新平衡,以分配工作負載;如果縮減集群或者worker崩潰了Kafka Connect將再次重新平衡,以確保所有連接器任務仍然被執行。建議的最小worker數量是2個,分散式模式也具有更強的容錯能力。如果一個節點意外離開集群,Kafka Connect會自動將該節點的工作分配給集群中的其他節點。並且,因為Kafka Connect將連接器的配置、狀態和偏移量資訊存儲在Kafka集群中,在這裡它被安全地複製,所以失去一個Connect worker運行的節點不會導致任何數據的丟失.由於可伸縮性、高可用性和管理優勢,建議在生產環境中採用分散式模式。如果您有多個worker同時在一台機器上運行,了解資源限制(CPU和記憶體)。從默認堆大小設置開始,監視內部指標和系統。檢查CPU、記憶體和網路(10GbE或更高)是否滿足負載要求。
- 獨立模式(單機模式):在獨立模式下,Kafka Connect worker使用文件存儲來保存它的狀態。連接器是從本地配置文件創建的,而不是REST API。因此,您不能將worker集群在一起,不能擴展吞吐量,也沒有容錯能力。常用於在本地機器上開發和測試Kafka Connect非常有用。它也可以用於通常使用單個代理的環境(例如,發送web伺服器日誌到Kafka)
安裝連接插件
-
Kafka Connect只有一個必要的先決條件來啟動;也就是一組Kafka broker。這些Kafka代理可以是早期的代理版本,也可以是最新的版本。雖然Schema Registry不是Kafka Connect的必需服務,但它可以讓你輕鬆地使用Avro, Protobuf和JSON Schema作為通用的數據格式來讀取和寫入Kafka記錄。這使編寫自定義程式碼的需求最小化,並以靈活的格式標準化數據。
-
Kafka Connect被設計成可擴展的,所以開發者可以創建自定義的連接器、轉換或轉換器,用戶可以安裝和運行它們。在一個Connect部署中安裝許多插件是很常見的,但需要確保每個插件只安裝一個版本。
-
Kafka Connect插件可以是:文件系統上的一個目錄,包含所有需要的JAR文件和插件的第三方依賴項。這是最常見和首選的。一個包含插件及其第三方依賴的所有類文件的JAR。一個插件永遠不應該包含任何由Kafka Connect運行時提供的庫。
-
Kafka Connect使用一個定義為逗號分隔的插件目錄路徑列表來查找插件。worker路徑配置屬性配置如下:plugin.path=/usr/local/share/kafka/plugins。安裝插件時,將插件目錄或JAR(或一個解析到這些目錄之一的符號鏈接)放置在插件路徑中已經列出的目錄中。或者可以通過添加包含插件的目錄的絕對路徑來更新插件路徑。使用上面的插件路徑的例子,可以在每台運行Connect的機器上創建一個/usr/local/share/kafka/plugins目錄,然後把插件目錄(或者超級jar)放在那裡。
基本概念
- Connectors:通過管理task來協調數據流的高級抽象
- Kafka Connect中的connector定義了數據應該從哪裡複製到哪裡。connector實例是一種邏輯作業,負責管理Kafka與另一個系統之間的數據複製。
- 我們在大多數情況下都是使用一些平台提供的現成的connector。但是,也可以從頭編寫一個新的connector插件。在高層次上,希望編寫新連接器插件的開發人員遵循以下工作流。
- Tasks:如何將數據複製到Kafka或從Kafka複製數據的實現
- Task是Connect數據模型中的主要處理數據的角色,也就是真正幹活的。每個connector實例協調一組實際複製數據的task。通過允許connector將單個作業分解為多個task,Kafka Connect提供了內置的對並行性和可伸縮數據複製的支援,只需很少的配置。
- 這些任務沒有存儲任何狀態。任務狀態存儲在Kafka中的特殊主題
config.storage.topic
和status.storage.topic
中。因此,可以在任何時候啟動、停止或重新啟動任務,以提供彈性的、可伸縮的數據管道。 - Task Rebalance
- 當connector首次提交到集群時,workers會重新平衡集群中的所有connector及其tasks,以便每個worker的工作量大致相同。當connector增加或減少它們所需的task數量,或者更改connector的配置時,也會使用相同的重新平衡過程。
- 當一個worker失敗時,task在活動的worker之間重新平衡。當一個task失敗時,不會觸發再平衡,因為task失敗被認為是一個例外情況。因此,失敗的task不會被框架自動重新啟動,應該通過REST API重新啟動。
- Workers:執行Connector和Task的運行進程。
- Converters: 用於在Connect和外部系統發送或接收數據之間轉換數據的程式碼。在向Kafka寫入或從Kafka讀取數據時,Converter是使Kafka Connect支援特定數據格式所必需的。task使用Converters將數據格式從位元組轉換為連接內部數據格式,反之亦然。並且Converter與Connector本身是解耦的,以便在Connector之間自然地重用Converter。默認提供一些Converters:
- AvroConverter(建議):與Schema Registry一起使用
- JsonConverter:適合結構數據
- StringConverter:簡單的字元串格式
- ByteArrayConverter:提供不進行轉換的「傳遞」選項
- Transforms:更改由連接器生成或發送到連接器的每個消息的簡單邏輯。
- Connector可以配置Transforms,以便對單個消息進行簡單且輕量的修改。這對於小數據的調整和事件路由十分方便,且可以在connector配置中將多個Transforms連接在一起。然而,應用於多個消息的更複雜的Transforms最好使用KSQL和Kafka Stream來實現。
- Transforms是一個簡單的函數,輸入一條記錄,並輸出一條修改過的記錄。Kafka Connect提供許多Transforms,它們都執行簡單但有用的修改。可以使用自己的邏輯訂製實現轉換介面,將它們打包為Kafka Connect插件,將它們與connector一起使用。
- 當Transforms與Source Connector一起使用時,Kafka Connect通過第一個Transforms傳遞connector生成的每條源記錄,第一個Transforms對其進行修改並輸出一個新的源記錄。將更新後的源記錄傳遞到鏈中的下一個Transforms,該Transforms再生成一個新的修改後的源記錄。最後更新的源記錄會被轉換為二進位格式寫入到Kafka。Transforms也可以與Sink Connector一起使用。
使用示例
Kafka安裝目錄
在Kafka安裝目錄下也已自帶Connect腳本和簡單模板配置文件,而Confluent Platform中也會包含了更多worker示例配置屬性文件(如etc/schema-registry/connect-avro-distributed.properties、etc/schema-registry/connect-avro-standalone.properties)。
官方使用說明
group.id
(默認connect-cluster
) – Connect cluster group使用唯一的名稱;注意這不能和consumer group ID(消費者組)衝突。config.storage.topic
(默認connect-configs
) – topic用於存儲connector和任務配置;注意,這應該是一個單個的partition,多副本的topic。你需要手動創建這個topic,以確保是單個partition(自動創建的可能會有多個partition)。offset.storage.topic
(默認connect-offsets
) – topic用於存儲offsets;這個topic應該配置多個partition和副本。status.storage.topic
(默認connect-status
) – topic 用於存儲狀態;這個topic 可以有多個partitions和副本
建議允許Kafka Connect自動創建內部管理的Topics。如果需要手動創建示例如下
# config.storage.topic=connect-configs
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
# offset.storage.topic=connect-offsets
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
# status.storage.topic=connect-status
bin/kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
插件下載
Confluent已提供很多Kafka Connect的實現的Connector連接器,可以先到這裡搜索插件下載Confluent Hub插件下載地址 //www.confluent.io/hub/
獨立模式使用
這裡不做演示,獨立模式通常用於開發和測試,或者輕量級、單代理環境(例如,發送web伺服器日誌到Kafka)。單機模式啟動worker的示例命令:
bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]
分散式模式
REST API
Kafka Connect是作為一個服務運行的,支援REST API來管理連接器。默認情況下,該服務運行在埠8083上。當在分散式模式下執行時,REST API將成為集群的主要介面;你可以向任何集群發出請求REST API會自動轉發請求。目前REST API只支援application/json作為請求和響應實體內容類型。請求通過HTTP Accept頭指定預期的響應內容類型和或者提交請求content – type報頭指定請求實體的內容類型
Accept: application/json
Content-Type: application/json
-
GET /
– 頂級(根)請求,獲取服務REST請求的Connect worker的版本,源程式碼的git提交ID,以及該worker連接的Kafka集群ID -
GET /connectors
– 返回活躍的connector列表 -
POST /connectors
– 創建一個新的connector;請求的主體是一個包含字元串name欄位和對象config欄位(connector的配置參數)的JSON對象。 -
GET /connectors/{name}
– 獲取指定connector的資訊 -
GET /connectors/{name}/config
– 獲取指定connector的配置參數 -
PUT /connectors/{name}/config
– 更新指定connector的配置參數 -
GET /connectors/{name}/status
– 獲取connector的當前狀態,包括它是否正在運行,失敗,暫停等。 -
GET /connectors/{name}/tasks
– 獲取當前正在運行的connector的任務列表。 -
GET /connectors/{name}/tasks/{taskid}/status
– 獲取任務的當前狀態,包括是否是運行中的,失敗的,暫停的等, -
PUT /connectors/{name}/pause
– 暫停連接器和它的任務,停止消息處理,直到connector恢復。 -
PUT /connectors/{name}/resume
– 恢復暫停的connector(如果connector沒有暫停,則什麼都不做) -
POST /connectors/{name}/restart
– 重啟connector(connector已故障) -
POST /connectors/{name}/tasks/{taskId}/restart
– 重啟單個任務 (通常這個任務已失敗) -
DELETE /connectors/{name}
– 刪除connector, 停止所有的任務並刪除其配置 -
GET /connector-plugins
– 返回已在Kafka Connect集群安裝的connector plugin
列表。請注意,API僅驗證處理請求的worker的connector。這以為著你可能看不不一致的結果,特別是在滾動升級的時候(添加新的connector jar) -
PUT /connector-plugins/{connector-type}/config/validate
– 對提供的配置值進行驗證,執行對每個配置驗證,返回驗證的建議值和錯誤資訊。
本地文件輸入寫入HDFS示例
# FileStreamSource是Kafka本身就提供Connector實現,直接配置使用即可
# 將confluentinc-kafka-connect-hdfs-10.1.6.zip解壓到指定插件目錄,我這裡在Kafka安裝目錄下的config/connect-distributed.properties配置項
plugin.path=/opt/kafka/connectors
vim /home/commons/kafka_2.13-3.1.0/config/connect-distributed.properties
# Broker Server的訪問ip和埠號
bootstrap.servers=server1:9092,server2:9092,server3:9092
# 指定集群id
group.id=connect-cluster
# 指定rest服務的埠號
rest.port=8083
# 指定Connect插件包的存放路徑
plugin.path=/opt/kafka/connectors
# 配置轉換器
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# 在Kafka根目錄下啟動worker
./bin/connect-distributed.sh -daemon config/connect-distributed.properties
創建FileStreamSource的json文件filesource.json
{
"name":"local-file-source",
"config":{
"topic":"distributetest",
"connector.class":"FileStreamSource",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"converter.internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
"converter.internal.value.converter":"org.apache.kafka.connect.storage.StringConverter",
"file":"/home/commons/kafka_2.13-3.1.0/distribute-source.txt"
}
}
創建HdfsSinkConnector的json文件hdfssink.json
{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "distributetest",
"topics.dir": "/user/hive/warehouse/topictest/",
"hdfs.url": "hdfs://192.168.3.14:8020",
"flush.size": "2"
}
}
curl -X GET //localhost:8083/connector-plugins
# 創建filesource和hdfssink
curl -X POST -H 'Content-Type: application/json' -i '//localhost:8083/connectors' [email protected] -w '\n'
curl -X POST -H 'Content-Type: application/json' -i '//localhost:8083/connectors' [email protected] -w '\n'
# 寫入數據,消費kafka主題也訂閱到相應的數據
echo "{"name":"zhangsan"}" >> distribute-source.txt
echo "{"name":"zhangsan1"}" >> distribute-source.txt
echo "{"name":"zhangsan2"}" >> distribute-source.txt
curl -X GET -H 'Content-Type: application/json' -i '//localhost:8083/connectors/hdfs-sink/status' -w '\n'
查看HDFS中也已經有相應的數據,如果需要更詳細的數據可以從官方查找使用,後續我們再詳細介紹更多實際使用場景
**本人部落格網站 **IT小神 www.itxiaoshen.com