Kafka數據每5分鐘同步到Hive
1.概述
最近有同學留言諮詢Kafka數據落地到Hive的一些問題,今天筆者將為大家來介紹一種除Flink流批一體以外的方式(流批一體下次再單獨寫一篇給大家分享)。
2.內容
首先,我們簡單來描述一下數據場景,比如有這樣一個數據場景,有一批實時流數據實時寫入Kafka,然後需要對Topic中的數據進行每隔5分鐘進行落地到Hive,進行每5分鐘分區存儲。流程圖如下所示:
2.1 環境依賴
整個流程,需要依賴的組件有Kafka、Flink、Hadoop。由於Flink提交需要依賴Hadoop的計算資源和存儲資源,所以Hadoop的YARN和HDFS均需要啟動。各個組件版本如下:
組件 | 版本 |
Kafka | 2.4.0 |
Flink | 1.10.0 |
Hadoop | 2.10.0 |
2.2 每分鐘落地HDFS實現
Flink消費Kafka集群中的數據,需要依賴Flink包,依賴如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>${flink.connector.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>${flink.kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.streaming.version}</version> </dependency>
編寫消費Topic的Flink代碼,這裡不對Topic中的數據做邏輯處理,在後面統一交給MapReduce來做數據預處理,直接消費並存儲到HDFS上。代碼如下:
public class Kafka2Hdfs { private static Logger LOG = LoggerFactory.getLogger(Kafka2Hdfs.class); public static void main(String[] args) { if (args.length != 3) { LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist."); return; } String bootStrapServer = args[0]; String hdfsPath = args[1]; int parallelism = Integer.parseInt(args[2]); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setParallelism(parallelism); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer))); // Storage into hdfs BucketingSink<String> sink = new BucketingSink<>(hdfsPath); sink.setBucketer(new JDateTimeBucketer<String>("HH-mm"));// 自定義存儲到HDFS上的文件名,用小時和分鐘來命名,方便後面算策略 sink.setBatchSize(1024 * 1024 * 4); // this is 5MB sink.setBatchRolloverInterval(1000 * 30); // 30s producer a file into hdfs transction.addSink(sink); env.execute("Kafka2Hdfs"); } private static Object configByKafkaServer(String bootStrapServer) { Properties props = new Properties(); props.setProperty("bootstrap.servers", bootStrapServer); props.setProperty("group.id", "test_bll_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } }
2.3 注意事項
- 這裡我們把時間窗口設置小一些,每30s做一次檢查,如果該批次的時間窗口沒有數據過來,就生成一個文件落地到HDFS上;
- 另外,我們重寫了DateTimeBucketer為JDateTimeBucketer,邏輯並不複雜,在原有的方法上加一個年-月-日/時-分的文件生成路徑,例如在HDFS上的生成路徑:xxxx/2020-12-26/00-00
2.4 數據預處理
這裡,我們需要對落地到HDFS上的文件進行預處理,處理的邏輯是這樣的。比如,現在是2020-12-26 14:00,那麼我們需要將當天的13:55,13:56,13:57,13:58,13:59這最近5分鐘的數據處理到一起,並加載到Hive的最近5分鐘的一個分區裏面去。那麼,我們需要生成這樣一個邏輯策略集合,用HH-mm作為key,與之最近的5個文件作為value,進行數據預處理合併。
實現代碼如下:
public class DateRange { public static void main(String[] args) { for (int i = 0; i < 24; i++) { for (int j = 0; j < 60; j++) { if (j % 5 == 0) { if (j < 10) { if (i < 10) { if (i == 0 && j == 0) { System.out.println("0" + i + "-0" + j + "=>23-59,23-58,23-57,23-56,23-55"); } else { if (j == 0) { String tmp = ""; for (int k = 1; k <= 5; k++) { tmp += "0" + (i - 1) + "-" + (60 - k) + ","; } System.out.println("0" + i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); } else { String tmp = ""; for (int k = 1; k <= 5; k++) { if (j - k < 10) { tmp += "0" + i + "-0" + (j - k) + ","; } else { tmp += "0" + i + "-" + (j - k) + ","; } } System.out.println("0" + i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); } } } else { if (j == 0) { String tmp = ""; for (int k = 1; k <= 5; k++) { if (i - 1 < 10) { tmp += "0" + (i - 1) + "-" + (60 - k) + ","; } else { tmp += (i - 1) + "-" + (60 - k) + ","; } } System.out.println(i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); } else { String tmp = ""; for (int k = 1; k <= 5; k++) { if (j - k < 10) { tmp += i + "-0" + (j - k) + ","; } else { tmp += i + "-" + (j - k) + ","; } } System.out.println(i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); } } } else { if (i < 10) { String tmp = ""; for (int k = 1; k <= 5; k++) { if (j - k < 10) { tmp += "0" + i + "-0" + (j - k) + ","; } else { tmp += "0" + i + "-" + (j - k) + ","; } } System.out.println("0" + i + "-" + j + "=>" + tmp.substring(0, tmp.length() - 1)); } else { String tmp = ""; for (int k = 1; k <= 5; k++) { if (j - 1 < 10) { tmp += i + "-0" + (j - k) + ","; } else { tmp += i + "-" + (j - k) + ","; } } System.out.println(i + "-" + j + "=>" + tmp.substring(0, tmp.length() - 1)); } } } } } } }
預覽結果如下:
需要注意的是,如果發生了第二天00:00,那麼我們需要用到前一天的00-00=>23-59,23-58,23-57,23-56,23-55這5個文件中的數據來做預處理。
2.5 數據加載
準備好數據後,我們可以使用Hive的load命令直接加載HDFS上預處理的文件,把數據加載到對應的表中,實現命令如下:
load data inpath '/cluster01/hive/hfile/data/min/2020-12-26/14-05/' overwrite into table jketable partition(day='2020-12-26-14-05')
這裡,我們在執行命令時,可能文件不存在會導致加載出錯。那我們在加載HDFS路徑之前,先判斷一下路徑是否存在。
實現腳本如下所示:
hdfs dfs -ls /cluster01/hive/hfile/data/min/2020-12-26/14-05/ | wc -l > /tmp/hdfs_check_files.txt hdfs_check_files=`cat /tmp/hdfs_check_files.txt`
# 判斷HDFS上文件是否存在 if [ $hdfs_check_files -eq 0 ] then echo "Match file is null.Stop hive load script." else echo "Match file is exist.Start hive load script." hive -e "load data inpath '/cluster01/hive/hfile/data/min/2020-12-26/14-05/' overwrite into table jketable partition(day='2020-12-26-14-05')" fi
3.總結
整個流程為,先使用Flink消費存儲在Kafka中的數據,按照每分鐘進行存儲,然後將具體需要聚合的時間段進行策略生成,比如每5分鐘、10分鐘、15分鐘等等,可以在DateRange類中修改對應的策略邏輯。最後,再將預處理好的數據使用hive命令進行加載。整個過程,流程較多。如果我們使用Flink的流批一體特性,可以通過Flink直接建表,然後使用Flink消費Kafka中的數據後,直接分區落地到Hive表,這個就留到下次再給大家分享吧。
4.結束語
這篇博客就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。