Spark Java創建DataFrame

俗話說得好,磨刀不誤砍柴工,獻上一副來自國家5A級風景區美景圖。

述說正傳,接下來開始說正事。

以前用Python和Scala操作Spark的時候比較多,畢竟Python和Scala程式碼寫起來要簡潔很多。

今天一起來看看Java版本怎麼創建DataFrame,程式碼寫起來其實差不多,畢竟公用同一套API。測試數據可以參考我之前的文章。

先來總結下Spark的一般流程:

1,先創建Spark基礎變數,spark,sc

2,載入數據,rdd.textFile,spark.read.csv/json等

3,數據處理,mapPartition, map,filter,reduce等一系列transformation操作

4,數據保存,saveAstextFile,或者其他DataFrame方法

祭出程式碼

package dev.java;

import dev.utils.Utils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.List;

public class Spark1 {

    private static final String fileData = "seed";
    private static final String fileSave = "result";
    private static SparkSession spark = SparkSession.builder()
                .appName("Java-Spark")
                .master("local[*]")
                .config("spark.default.parallelism", 100)
                .config("spark.sql.shuffle.partitions", 100)
                .config("spark.driver.maxResultSize", "3g")
                .getOrCreate();
    private static JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

    public static void main(String[] args) {
        Utils.delete(fileSave);
        //
        t1();
    }

    private static void t1() {
        JavaRDD<Row> rdd = sc.textFile(fileData)
                .map(v -> {
                    String[] parts = v.split("\t");
                    return RowFactory.create(parts[0], Long.parseLong(parts[1]));
                })
                .filter(v -> v.getLong(1) >= 10000)
                .sortBy(v -> v.getLong(1), false, 100)
                .coalesce(2);
        Dataset<Row> df = spark.createDataFrame(rdd, StructType.fromDDL("title string, qty long"));
        df.write().csv(fileSave);
        spark.stop();
    }
}

 

Tags: