Hudi 數據湖的插入,更新,查詢,分析操作示例

Hudi 數據湖的插入,更新,查詢,分析操作示例

作者:Grey

原文地址:

博客園:Hudi 數據湖的插入,更新,查詢,分析操作示例

CSDN:Hudi 數據湖的插入,更新,查詢,分析操作示例

前置工作

首先,需要先完成

Linux 下搭建 Kafka 環境

Linux 下搭建 Hadoop 環境

Linux 下搭建 HBase 環境

Linux 下搭建 Hive 環境

本文基於上述四個環境已經搭建完成的基礎上進行 Hudi 數據湖的插入,更新,查詢操作。

開發環境

Scala 2.11.8

JDK 1.8

需要熟悉 Maven 構建項目和 Scala 一些基礎語法。

操作步驟

master 節點首先啟動集群,執行:

stop-dfs.sh && start-dfs.sh

啟動 yarn,執行:

stop-yarn.sh && start-yarn.sh

然後準備一個 Mave 項目,在 src/main/resources 目錄下,將 Hadoop 的一些配置文件拷貝進來,分別是

$HADOOP_HOME/etc/hadoop/core-site.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://master:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/usr/local/hadoop/tmp</value>
    </property>
</configuration>

注意,需要在你訪問集群的機器上配置 host 文件,這樣才可以識別 master 節點。

$HADOOP_HOME/etc/hadoop/hdfs-site.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
</configuration>

$HADOOP_HOME/etc/hadoop/yarn-site.xml 文件,目前還沒有任何配置

<?xml version="1.0"?>

<configuration>
</configuration>

然後,設計實體的數據結構,

package git.snippet.entity

case class MyEntity(uid: Int,
                    uname: String,
                    dt: String
                   )

插入數據代碼如下

package git.snippet.test


import git.snippet.entity.MyEntity
import git.snippet.util.JsonUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object DataInsertion {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    insertData(sparkSession)
  }

  def insertData(sparkSession: SparkSession) = {
    import org.apache.spark.sql.functions._
    import sparkSession.implicits._
    val commitTime = System.currentTimeMillis().toString //生成提交時間
    val df = sparkSession.read.text("/mydata/data1")
      .mapPartitions(partitions => {
        partitions.map(item => {
          val jsonObject = JsonUtil.getJsonData(item.getString(0))
          MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
        })
      })
    val result = df.withColumn("ts", lit(commitTime)) //添加ts 時間戳列
      .withColumn("uuid", col("uid"))
      .withColumn("hudipart", col("dt")) //增加hudi分區列
    result.write.format("org.apache.hudi")
      .option("hoodie.insert.shuffle.parallelism", 2)
      .option("hoodie.upsert.shuffle.parallelism", 2)
      .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列
      .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列
      .option("hoodie.table.name", "myDataTable")
      .option("hoodie.datasource.write.partitionpath.field", "hudipart") //分區列
      .mode(SaveMode.Overwrite)
      .save("/snippet/data/hudi")
  }
}

然後,在 master 節點先準備好數據

vi data1

輸入如下數據

{'uid':1,'uname':'grey','dt':'2022/09'}
{'uid':2,'uname':'tony','dt':'2022/10'}

然後創建文件目錄,

hdfs dfs -mkdir /mydata/

把 data1 放入目錄下

hdfs dfs -put data1 /mydata/

訪問://192.168.100.130:50070/explorer.html#/mydata

可以查到這個數據

image

接下來執行插入數據的 scala 代碼,執行完畢後,驗證一下

訪問://192.168.100.130:50070/explorer.html#/snippet/data/hudi/2022

可以查看到插入的數據

image

準備一個 data2 文件

cp data1 data2 && vi data2

data2 的數據更新為

{'uid':1,'uname':'grey1','dt':'2022/11'}
{'uid':2,'uname':'tony1','dt':'2022/12'}

然後執行

hdfs dfs -put data2 /mydata/

更新數據的代碼,我們可以做如下調整,完整代碼如下

package git.snippet.test

import git.snippet.entity.MyEntity
import git.snippet.util.JsonUtil
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object DataUpdate {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    updateData(sparkSession)
  }

  def updateData(sparkSession: SparkSession) = {
    import org.apache.spark.sql.functions._
    import sparkSession.implicits._
    val commitTime = System.currentTimeMillis().toString //生成提交時間
    val df = sparkSession.read.text("/mydata/data2")
      .mapPartitions(partitions => {
        partitions.map(item => {
          val jsonObject = JsonUtil.getJsonData(item.getString(0))
          MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
        })
      })
    val result = df.withColumn("ts", lit(commitTime)) //添加ts 時間戳列
      .withColumn("uuid", col("uid")) //添加uuid 列
      .withColumn("hudipart", col("dt")) //增加hudi分區列
    result.write.format("org.apache.hudi")
      //      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
      .option("hoodie.insert.shuffle.parallelism", 2)
      .option("hoodie.upsert.shuffle.parallelism", 2)
      .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列
      .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列
      .option("hoodie.table.name", "myDataTable")
      .option("hoodie.datasource.write.partitionpath.field", "hudipart") //分區列
      .mode(SaveMode.Append)
      .save("/snippet/data/hudi")
  }
}

執行更新數據的代碼。

驗證一下,訪問://192.168.100.130:50070/explorer.html#/snippet/data/hudi/2022

可以查看到更新的數據情況

image

數據查詢的代碼也很簡單,完整代碼如下

package git.snippet.test

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DataQuery {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    queryData(sparkSession)
  }


  def queryData(sparkSession: SparkSession) = {
    val df = sparkSession.read.format("org.apache.hudi")
      .load("/snippet/data/hudi/*/*")
    df.show()
    println(df.count())
  }
}

執行,輸出以下信息,驗證成功。

image

數據查詢也支持很多查詢條件,比如增量查詢,按時間段查詢等。

接下來是 flink 實時數據分析的服務,首先需要在 master 上啟動 kafka,並創建 一個名字為 mytopic 的 topic,詳見Linux 下搭建 Kafka 環境

相關命令如下

創建topic

kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topic  mytopic

生產者啟動配置

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic

消費者啟動配置

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic

然後運行如下代碼

package git.snippet.analyzer;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class DataAnalyzer {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.100.130:9092");
        properties.setProperty("group.id", "snippet");
        //構建FlinkKafkaConsumer
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        final DataStream<String> stream = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        stream.print();
        try {
            env.execute("DataAnalyzer");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其中

properties.setProperty("bootstrap.servers", "192.168.100.130:9092");

根據自己的配置調整,然後通過 kakfa 的生產者客戶端輸入一些數據,這邊可以收到這個數據,驗證完畢。

完整代碼見

data-lake