這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,部落客出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買部落客的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學影片。
Kafka是一個分布表示實時數據流平台,可獨立部署在單台伺服器上,也可部署在多台伺服器上構成集群。它提供了發布與訂閱的功能,用戶可以發送數據到Kafka集群中,也可以從Kafka集群中讀取數據。之前在Kafka 2.8.0版本時,Kafka社區提出了KRaft協議的概念,現在社區發布了Kafka 3.0,裡面涉及優化和新增了很多功能,其中就包含KRaft協議的改機。今天,筆者就給大家介紹一下Kafka 3.0新增了哪些特性以及優化了哪些功能。
在 Kafka 3.0 中包含了許多重要的新功能,其中比較顯著的變化如下所示:
在Kafka 3.0中,社區對於Zookeeper的版本已經升級到3.6.3了,其中我們可以預覽 KRaft 模式,但是無法從 2.8 或者更早的版本升級到該模式。許多實現依賴 jar 現在在運行時類路勁中可用,而不是在編譯和運行時類路勁中。升級後的編譯錯誤可以通過顯示添加缺少的依賴 jar 或更新應用程式以不使用內部類來修復。
消費者配置的默認值 session.timeout.ms 從10 秒增加到了45 秒,而Broker配置 log.message.format.version 和 Topic 配置 message.format.version 已經被啟用。兩種配置的值始終假定為 3.0 或者更高,通過 inter.broker.protocol.version 來配置。如果設置了 log.message.format.version 或者 message.format.version 建議在升級到 3.0的同時清理掉這兩個屬性,同時設置 inter.broker.protocol.version 值為 3.0 。
Streams API 刪除了在 2.5.0 或者更早版本中棄用的所有棄用 API,Kafka Streams 不再對「connect:json」模組有編譯時的依賴,依賴此傳遞依賴項的項目必須明確聲明它。
現在,通過指定的自定義主體構建起實現 principal.builder.class 現在必須實現 KafkaPrincipalSerde 介面以允許Broker 之間的轉發。另外,一些過時的類,方法和工具以及從clients、connect、core、和tools模組進行了刪除。
該Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被棄用。請使用 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)來替換,ConsumerGroupMetadata 可以通過檢索KafkaConsumer#groupMetadata()更強大的語義。需要注意的是,完整的消費者組元數據集只有 Brokers 或 2.5 或更高版本才能支援,因此你必須升級你的 Kafka 集群以獲得更強的語義。否則,你可以通過new ConsumerGroupMetadata(consumerGroupId)與較老版本的Broker進行交互。
連接器中 internal.key.converter 和 internal.value.converter 屬性已被完全刪除。自版本 2.0.0 起,不推薦使用這些 Connect 工作器屬性。現在被硬編碼為使用 schemas.enable 設置為的 JSON 轉換器false。如果你的集群一直在使用不同的內部鍵或值轉換器,你可以按照官網文檔中概述的遷移步驟,將你的 Connect 集群安全地升級到 3.0。 基於 Connect 的 MirrorMaker (MM2) 包括對支援的更改IdentityReplicationPolicy,無需重命名 Topic 即可啟用複製。DefaultReplicationPolicy默認情況下仍然使用現有的,但可以通過 replication.policy 配置屬性啟用身份複製 。這對於從舊版 MirrorMaker (MM1) 遷移的用戶,或者對於不希望 Topic 重命名的具有簡單單向複製拓撲的用例特別有用。請注意IdentityReplicationPolicy與 DefaultReplicationPolicy 不同,無法根據 Topic 名稱阻止複製循環,因此在構建複製拓撲時要注意避免循環。
雖然 internal.key.converter 和 internal.value.converter 中 Connect 工作器屬性,以及以這些名稱為前綴的所有屬性都已棄用,但是有時候用戶仍會嘗試使用這些屬性進行調試,在與未棄用的Key 和 Value轉化器相關的屬性意外混淆後,或者只是對其進行盲目的配置後,進行調試。這些實驗的結果可能會產生不好的後果,配置了新內保轉換器卻無法讀取具有較舊內部轉換器的內保 Topic 數據,這最多會導致偏移量和連機器配置的丟失。
以下連接屬性會將被刪除:
Connect 的行為就好像上面沒有提供一樣。具體來說,對於它的鍵和值轉換器,它將使用開箱即用的 JsonConverter,配置為 schemas.enable 屬性值為 false 。
運行未使用JsonConverter 並對 schemas.enable 設置 false 的 Connect 集群用戶,可以按照以下步驟將其 Connect 集群升級到 3.0:
在本次 Kafka 3.0 版本中新增了以下功能:
之前在核心 Kafka 產品中引入了 Headers,在 Kafka Connect Framework 中公開它們將是有利的。Kafka 的 Header 是帶有二進位值的簡單名稱,而 Connect API 已經有一個非常有用的層來處理不同類型的數據。Connect 的 Header 支援應該使用像 Kafka 這樣的字元串名稱,但使用與 Connect 記錄鍵和值相同的類型來表示值。這將提供與 Connect 框架的其餘部分的一致性,並使連接器和轉換能夠輕鬆地訪問、修改和創建記錄上的 Header。
Kafka 將 Header 定義為具有字元串名稱和二進位值,但 Connect 將使用用於記錄鍵和值的相同機制來表示 Header 值。每個 Header 值可能有一個對應的 Schema,允許連接器和轉換以一致的方式處理 Header 值、記錄鍵和記錄值。Connect 將定義一種 HeaderConverter 機制以類似於Converter框架的方式序列化和反序列化標頭值 ,這樣現有的 Converter實現也可以實現 HeaderConverter. 由於來自不同供應商的連接器和轉換可能被組合到單個管道中,因此不同的連接器和轉換可以輕鬆地將 Header 值從原始形式轉換為連接器和/或轉換期望的類型,這一點很重要。
注意:
為了簡潔和清晰,顯示的程式碼不包括 JavaDoc,但提議的更改確實包括所有公共 API 和方法的 JavaDoc。
org.apache.kafka.connect.Header 將添加一個新介面並用作記錄上單個標頭的公共 API。該介面為鍵、值和值的模式定義了簡單的 getter。這些是不可變對象,還有一些方法可以創建Header具有不同名稱或值的新對象。程式碼片段如下所示:
package org.apache.kafka.connect.header; public interface Header { // Access the key and value String key(); // never null Schema schema(); // may be null Object value(); // may be null // Methods to create a copy Header with(Schema schema, Object value); Header rename(String key); }
org.apache.kafka.connect.Headers 還將添加一個新介面並用作記錄標題有序列表的公共 API。這是在 Kafka 客戶端的 org.apache.kafka.common.header.Headers介面之後作為標題的有序列表進行模式化的,其中允許多個具有相同名稱的標題。Connect Headers介面定義了Header按順序和/或按名稱訪問各個 對象以及獲取有關Header對象數量的資訊的方法 。它還定義了Header使用各種簽名來添加、刪除和保留 對象的方法,這些簽名將易於連接器和轉換使用。由於多個Header對象可以具有相同的名稱,因此轉換需要一種簡單的方法來修改和/或刪除現有Header對象, apply(HeaderTransform) 並且apply(String, HeaderTransform) 方法可以輕鬆使用自定義 lambda 函數來執行此操作。程式碼片段如下所示:
package org.apache.kafka.connect.header; public interface Headers extends Iterable<Header> { // Information about the Header instances int size(); boolean isEmpty(); Iterator<Header> allWithName(String key); Header lastWithName(String key); // Add Header instances to this object Headers add(Header header); Headers add(String key, SchemaAndValue schemaAndValue); Headers add(String key, Object value, Schema schema); Headers addString(String key, String value); Headers addBoolean(String key, boolean value); Headers addByte(String key, byte value); Headers addShort(String key, short value); Headers addInt(String key, int value); Headers addLong(String key, long value); Headers addFloat(String key, float value); Headers addDouble(String key, double value); Headers addBytes(String key, byte[] value); Headers addList(String key, List<?> value, Schema schema); Headers addMap(String key, Map<?, ?> value, Schema schema); Headers addStruct(String key, Struct value); Headers addDecimal(String key, BigDecimal value); Headers addDate(String key, java.util.Date value); Headers addTime(String key, java.util.Date value); Headers addTimestamp(String key, java.util.Date value); // Remove and/or retain the latest Header Headers clear(); Headers remove(String key); Headers retainLatest(String key); Headers retainLatest(); // Create a copy of this Headers object Headers duplicate(); // Apply transformations to named or all Header objects Headers apply(HeaderTransform transform); Headers apply(String key, HeaderTransform transform); interface HeaderTransform { Header apply(Header header); } }
每條 Kafka 消息都包含零個或多個標頭名稱-值對,因此 Connect 記錄類將被修改為具有Headers可以就地修改的非空對象。現有的 ConnectRecord 抽象類是兩個基類 SourceRecord和 SinkRecord,並且將被改變為具有新的 headers填充欄位 ConnectHeaders對象。所有現有構造函數和方法的簽名都將保持不變以保持後向兼容性,但現有構造函數將headers使用ConnectHeaders對象填充新欄位。而且, toString(), hashCode()和 equalTo(Object)方法將改為使用新的 headers領域。
一個新的構造函數和幾個新方法將被添加到這個現有的類中,程式碼片段如下所示:
package org.apache.kafka.connect.connector; public abstract class ConnectRecord<R extends ConnectRecord<R>> { /* The following will be added to this class */ private final Headers headers; public ConnectRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { this(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp); if (headers == null) { this.headers = new ConnectHeaders(); } else if (headers instanceof ConnectHeaders) { this.headers = (ConnectHeaders)headers; } else { this.headers = new ConnectHeaders(headers); } } public Headers headers() { return headers; } public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers); }
現有的 SourceRecord類將被修改以添加一個新的構造函數並實現附加 newRecord(…)方法。同樣,所有現有構造函數和方法的簽名將保持不變以保持向後兼容性。程式碼片段如下所示:
package org.apache.kafka.connect.source; public class SourceRecord extends ConnectRecord<SourceRecord> { /* The following will be added to this class */ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers); this.sourcePartition = sourcePartition; this.sourceOffset = sourceOffset; } @Override public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers); } }
同樣,SinkRecord 將修改現有 類以添加新的構造函數並實現附加 newRecord(…) 方法。同樣,所有現有構造函數和方法的簽名將保持不變以保持向後兼容性。程式碼片段如下所示:
package org.apache.kafka.connect.sink; public class SinkRecord extends ConnectRecord<SinkRecord> { /* The following will be added to this class */ public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, Long timestamp, TimestampType timestampType, Iterable<Header> headers) { super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers); this.kafkaOffset = kafkaOffset; this.timestampType = timestampType; } @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); } }
本次更新中添加了一個新 org.apache.kafka.connect.storage.HeaderConverter 介面,該org.apache.kafka.connect.storage.Converter介面在現有介面的基礎上進行了模式化, 但具有特定於 Header 的方法名稱和簽名。程式碼片段如下所示:
package org.apache.kafka.connect.storage; public interface HeaderConverter extends Configurable, Closeable { /** * Convert the header name and byte array value into a {@link Header} object. * @param topic the name of the topic for the record containing the header * @param headerKey the header's key; may not be null * @param value the header's raw value; may be null * @return the {@link SchemaAndValue}; may not be null */ SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value); /** * Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation. * @param topic the name of the topic for the record containing the header * @param headerKey the header's key; may not be null * @param schema the schema for the header's value; may be null * @param value the header's value to convert; may be null * @return the byte array form of the Header's value; may be null if the value is null */ byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value); /** * Configuration specification for this set of header converters. * @return the configuration specification; may not be null */ ConfigDef config(); }
需要注意的是,不同的是 Converter,新 HeaderConverter介面擴展了 Configurable 現在對於可能具有附加配置屬性的 Connect 介面通用的介面。
現有實現 Converter 也可能實現 HeaderConverter,並且ConverterConnect 中的所有三個現有 實現都將相應地更改以通過序列化/反序列化 Header 值來實現這個新介面,類似於它們序列化/反序列化鍵和值的方式:
HeaderConverter 將添加一個新實現來將所有內置原語、數組、映射和結構與字元串表示形式相互轉換。與StringConverter使用 toString()方法的不同 SimpleHeaderConverter,除了不帶引號的簡單字元串值之外, 使用類似 JSON 的表示形式表示基本類型、數組、映射和結構。這種形式直接對應於許多開發人員認為將值序列化為字元串的方式,並且可以 SimpleHeaderConverter解析這些任何和所有這樣的值,並且大部分時間來推斷正確的模式。因此,這將用於HeaderConverterConnect 工作程式中使用的默認值 。
下表描述了SimpleHeaderConverter將如何持久化這些值,表格如下:
類型 | 描述 | 例子 |
BOOLEAN | true或者false | |
BYTE_ARRAY | 位元組數組的Base64編碼字元串 | |
INT8 | Java位元組的字元串表示形式 | |
INT16 | Java Short的字元串表示形式 | |
INT32 | Java Int的字元串表示形式 | |
INT64 | Java Long的字元串表示形式 | |
FLOAT32 | Java 浮點數的字元串表示形式 | |
FLOAT64 | Java Double的字元串表示形式 | |
STRING | 字元串的UTF-8表示 | |
ARRAY | 數組的類似 JSON 的表示形式。數組值可以是任何類型,包括基本類型和非基本類型。 | |
MAP | 類似 JSON 的表示形式。儘管大多數正確創建的映射都具有相同類型的鍵和值,但也支援具有任何鍵和值的映射。映射值可以是任何類型,包括基本類型和非基本類型。 | { “foo”: “value”, “bar”: “strValue”, “baz”: “other” } |
STRUCT | 類似 JSON 的表示形式。Struct 對象可以序列化,但反序列化時將始終解析為映射,因為模式不包含在序列化形式中。 | { “foo”: true, “bar”: “strValue”, “baz”: 1234 } |
DECIMAL | 對應的字元串表示java.math.BigDecimal。 | |
TIME | IOS-8601 時間表示,格式為「HH:mm:ss.SSS’Z’」。 | 16:31:05.387UTC |
DATE | 日期的 ISO-8601 表示,格式為「YYYY-MM-DD」。 | 2021-09-25 |
TIMESTAMP | 時間戳的 ISO-8601 表示,格式為「YYYY-MM-DD’T’HH:mm:ss.SSS’Z’」。 |
2021-09-25T 16:31:05.387UTC |
Connect 工作器需要配置為使用 HeaderConverter 實現,因此header.converter 將定義一個名為的附加工作器配置 ,默認為 SimpleHeaderConverter. 具有相同名稱和默認值的類似配置屬性將添加到連接器配置中,允許連接器覆蓋工作程式的 Header 轉換器。請注意,每個連接器任務都有自己的標頭轉換器實例,就像鍵和值轉換器一樣。
每個 Header 都有一個可由接收器連接器和簡單消息轉換使用的值。但是,標頭值的類型首先取決於標頭的創建方式以及它們的序列化和反序列化方式。將添加一組新的轉換實用程式方法,使 SMT 和接收器連接器可以輕鬆地將標頭值轉換為易於使用的類型。這些轉換可能需要原始架構和值。與字元串之間的轉換使用與上述相同的機制SimpleHeaderConverter。
例如,SMT 或接收器連接器可能期望標頭值為 long,並且可以使用這些實用方法來轉換任何數值(例如,int、short、String、BigDecimal 等)。或者,接收器連接器可能需要 Timestamp 邏輯數據類型,因此它可以使用該 Values.convertToTimestamp(s,v) 方法從時間戳或日期的任何 ISO-8601 格式字元串表示轉換,或表示為 long 或字元串的過去紀元的毫秒數。
這些實用方法可用於 Header 值或鍵、值或結構、數組和映射中的任何值。程式碼片段如下所示:
package org.apache.kafka.connect.data; public class Values { // All methods return null when value is null, and throw a DataException // if the value cannot be converted to the desired type. // If the value is already the desired type, these methods simply return it. public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {...} public static Byte convertToByte(Schema schema, Object value) throws DataException {...} public static Short convertToShort(Schema schema, Object value) throws DataException {...} public static Integer convertToInteger(Schema schema, Object value) throws DataException {...} public static Long convertToLong(Schema schema, Object value) throws DataException {...} public static Float convertToFloat(Schema schema, Object value) throws DataException {...} public static Double convertToDouble(Schema schema, Object value) throws DataException {...} public static String convertToString(Schema schema, Object value) {...} public static java.util.Date convertToTime(Schema schema, Object value) throws DataException {...} public static java.util.Date convertToDate(Schema schema, Object value) throws DataException {...} public static java.util.Date convertToTimestamp(Schema schema, Object value) throws DataException {...} public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) throws DataException {...} // These only support converting from a compatible string form, which is the same // format used in the SimpleHeaderConverter described above public static List<?> convertToList(Object value) {...} public static Map<?, ?> convertToMap(Object value) {...} // Only supports returning the value if it already is a Struct. public static Struct convertToStruct(Object value) {...} }
在 Kafka 3.0 中優化和調整了以下內容:
在 Kafka 3.0 中修復了如下BUG:
在 Kafka 3.0 中的開發任務如下:
Kafka 3.0 的發布標誌著社區對 Kafka 項目邁向了一個新的里程牌。另外,感謝Kafka PMC對Kafka Eagle監控系統的認可,為了維護Apache社區的商標權益,現在對Kafka Eagle正式改名為EFAK(Eagle For Apache Kafka),EFAK會持續更新迭代優化,為大家管理Kafka集群和使用Kafka應用提供便利,歡迎大家使用EFAK,也可以到Github或者EAFK官網上關注 EFAK 的最新動態。
這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,部落客出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買部落客的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學影片。