折騰一晚上的事情,明白了一個道理

感悟:有時候很簡單的笨辦法,比那些高大上的技術要實用的多。

有一個數據同步,大約4億條記錄,沒有分區。現在要按照天,小時分區寫入到iceberg的分區表中。
源數據中本身就是很多幾十k大小的非常多的小文件。

於是在讀取時,總想着要shuffle,合併小文件,於是是這樣的:

hive_df = spark.table("xx.hive_test")
hive_df.repartition(1).write.format('iceberg').mode('overwrite').save('iceberg_dw.iceberg_test')

但是iceberg表寫入分區表時,需要根據分區本地排序,或者全局排序,於是寫成這樣:

hive_df = spark.table("xx.hive_test")
hive_df.repartition(1).sortWithinPartitions("pk_day","pk_hour").write.format('iceberg').mode('overwrite').save('iceberg_dw.iceberg_test')

但是執行起來後,分區排序合併都沒有問題,但是由於數據量巨大,都卡在最後的1個partition任務那。於是又改成了多個重分區。

hive_df = spark.table("xx.hive_test")
hive_df.repartition(20).sortWithinPartitions("pk_day","pk_hour").write.format('iceberg').mode('overwrite').save('iceberg_dw.iceberg_test')

但是依然很慢,於是加大了分區

hive_df = spark.table("xx.hive_test")
hive_df.repartition(100).sortWithinPartitions("pk_day","pk_hour").write.format('iceberg').mode('overwrite').save('iceberg_dw.iceberg_test')

折騰了大半夜,通過增大excuter個數,內存大小還是不行。

早上醒來,忽然想到了大數據的分而治之的想法,以前跑數據的時候也干過,將源頭數據分幾段來讀取寫入,多跑幾個任務就可以。
於是修改為以下代碼,通過傳入年月日,時分秒,再轉換為對應的時間戳取查詢源頭的記錄,將單個小時的合併為1個分區寫入目標表。
pySpark程序命名為hive2iceberg.py

from pyspark.sql import SparkSession
import sys,datetime,time,pytz
day_hour = sys.argv[1] + ' ' + sys.argv[2] # 2022-09-02 09:00:00
utc_timestamp = time.mktime(datetime.datetime.strptime(day_hour, '%Y-%m-%d %H:%M:%S').replace(tzinfo=pytz.utc).timetuple())
dt = int(utc_timestamp) * 1000
pk_day = dt - dt % 86400000
dayBeforeYesterday = (dt - dt % 86400000) - 86400000
pk_hour = dt - dt % 3600000

print("=====================================================================================================================")
print(">>>>>>>>>>>>>>>>日期小時:" + str(day_hour) +  " pk_day:" + str(pk_day), "dayBeforeYesterday:" + str(dayBeforeYesterday) + " hour:" + str(pk_hour))
print("=====================================================================================================================")

pyfilename= "hive2iceberg"
spark = SparkSession \
    .builder \
    .config("spark.sql.shuffle.partitions", "1000") \
    .config("spark.driver.maxResultSize", "6g") \
    .config("spark.debug.maxToStringFields", "1000") \
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") \
    .config("spark.sql.session.timeZone", "UTC") \
    .config("spark.sql.crossJoin.enabled","true") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("iceberg.engine.hive.enabled","true") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("hive.exec.max.dynamic.partitions", "5000") \
    .appName(pyfilename) \
    .enableHiveSupport() \
    .getOrCreate()

df = spark.table("xx.hive_test").where("pk_day = %d and pk_hour = %d " % (pk_day,pk_hour))
df.repartition(1).sortWithinPartitions("pk_day","pk_hour").write.format('iceberg').mode('overwrite').save('iceberg_dw.iceberg_test')

然後又準備了一段shell腳本run_state_hive2iceberg_fixdata.sh如下:

#!/bin/sh

if [ $# = 1 ]
then
    start_dt=$1
    end_dt=$1

elif [ $# = 2 ]
then
    start_dt=$1
    end_dt=$2

elif [ $# = 0 ]
then
    start_dt=`date -d '-1 hour' +%Y%m%d-%H`
    end_dt=`date -d '-1 hour' +%Y%m%d-%H`
fi

date_format_1=${start_dt:0:4}-${start_dt:4:2}-${start_dt:6:2}' '${start_dt:9:2}:00:00
date_format_2=${end_dt:0:4}-${end_dt:4:2}-${end_dt:6:2}' '${end_dt:9:2}:00:00

start_sec=`date -d "$date_format_1" +%s`
end_sec=`date -d "$date_format_2" +%s`


for ((i=$start_sec;i<=$end_sec;i+=3600))
do

file_day=`date -d @$i +%Y%m%d`
data_day=`date -d @$i +%Y-%m-%d`
data_hour=`date -d @$i +"%H:00:00"`

echo ${data_day} ${data_hour}

spark-submit \
--conf spark.dynamicAllocation.enabled=false \
--name hive2iceberg \
--master yarn \
--deploy-mode cluster \
--queue prod \
--driver-memory 2G \
--num-executors 5 \
--executor-memory 5G \
--executor-cores 2 \
--archives hdfs://ns1/user/hadoop/mypy3spark_env/py3spark.tar.gz#py3spark \
--conf "spark.pyspark.python=./py3spark/py3spark/bin/python" \
--conf "spark.pyspark.driver.python=./py3spark/py3spark/bin/python" \
/home/hadoop/hive2iceberg.py ${data_day} ${data_hour}

status=$?

if [ $status = 0 ]; then
       echo "STATUS=SUCCESS"
else
       echo "STATUS=FAIL"
fi

查詢源表裏面的最大時間和最小時間,然後將時間切割成多個數據段,於是得到下面的執行腳本,因為上面的shell是循環按年月日小時跑的,所以放在後台執行即可。

nohup ./run_state_hive2iceberg_fixdata.sh 2021050504 2021070100 > 2021070100.log 2>&1 &
nohup ./run_state_hive2iceberg_fixdata.sh 2021070100 2021090100 > 2021090100.log 2>&1 &
nohup ./run_state_hive2iceberg_fixdata.sh 2021090100 2021110100 > 2021110100.log 2>&1 &
nohup ./run_state_hive2iceberg_fixdata.sh 2021110100 2022010100 > 2022010100.log 2>&1 &
nohup ./run_state_hive2iceberg_fixdata.sh 2022010100 2022040100 > 2022040100.log 2>&1 &
nohup ./run_state_hive2iceberg_fixdata.sh 2022040100 2022060100 > 2022060100.log 2>&1 &
nohup ./run_state_hive2iceberg_fixdata.sh 2022060100 2022080100 > 2022080100.log 2>&1 &
nohup ./run_state_hive2iceberg_fixdata.sh 2022080100 2022090220 > 2022090220.log 2>&1 &

自己大概計算了下,一個小時的數據量很小,從hive讀取到寫入iceberg也就1分鐘左右,所以一個小時的時間可以跑數據的大概60個小時。也就是一天可以跑數據的1440個小時,也就是2個月。
那麼從2021年5月-2022年9月,大概17個月,我分成了8個任務,每個任務跑2個月,大概全部跑完,也就一天的時間。
而且實際計算了下,使用的總算力比跑一個大任務還少不少。

現在想想有時候真的越笨的方法,越是簡單輕鬆。
好了,任務掛到後台,可以補覺去了。
image

Tags: