Spark3學習【基於Java】3. Spark-Sql常用API
學習一門開源技術一般有兩種入門方法,一種是去看官網文檔,比如Getting Started – Spark 3.2.0 Documentation (apache.org),另一種是去看官網的例子,也就是%SPARK_HOME%\examples下面的程式碼。打開IDEA,選擇File-Open…
跟前面文章中方法一樣導入jars目錄到classpath。
Spark解析json字元串
第一個例子是讀取並解析Json。這個例子的結果讓我有些震驚,先上程式碼:
-
public
static
void main(String[] args) {
-
SparkSession session = SparkSession.builder().master(“local[1]“).appName(“SparkSqlApp“).getOrCreate();
-
-
Dataset<Row> json = session.read().json(“spark-core/src/main/resources/people.json“);
-
json.show();
-
}
讓我驚訝的是文件的內容。例子裡面的文件是三個大括弧並列,文件擴展名是.json,由於沒有中括弧,所以格式是錯的:
-
{“name“:”Michael”}
-
{“name“:”Andy”, “age”:30}
-
{“name“:”Justin”, “age”:19}
但是spark解析出來了:
於是我把文件改成下面這樣向看下結果
-
[{“name“:”Michael”},
-
{“name“:”Andy”, “age”:30},
-
{“name“:”Justin”, “age”:19}
-
]
你猜輸出是什麼?
顯然,spark沒有解析出第一行,而且把第4行也解析了。這也說明了為什麼樣例的文件可以解析:首先跟文件擴展名是沒啥關係的,另外spark是按行解析,只要考慮這一行是否符合解析要求就可以,行末可以有逗號。所以把文件改成下面也是可以的
-
{“name“:”Michael”},
-
{“name“:”Andy”, “age”:30},..
-
{“name“:”Justin”, “age”:19}
第一行後面有逗號,第二行後面還有兩個點。
SQL 查詢
在之前的例子中,讀取文件返回的是Dataset<String>,因為之前確實是讀取的文件內容。現在使用json()方法返回的是DataFrame,數據是經過spark處理過的。
DataFrame提供了一些好用的方法,用的最多的就是show()。它主要用於調試,可以把數據以表格形式列印。spark確實給DataFrame生成了表結構,可以通過printSchema()方法查看
不但有欄位名,還有欄位類型,還有是否可空(好像都能空)。
DF還提供了類似於sql查詢的方法,比如select()/groupBy(),和where類似的filter()等:
這裡我們首先給年齡欄位+1,並通過別名(相等於SQL里的AS)讓他覆蓋之前的欄位,然後查詢比19大的記錄,最後根據年齡分組匯總。
如果我們把新欄位不覆蓋原欄位呢?你猜是執行報錯還是啥結果?
That’s all?當然不是,Spark提供了更強大的SQL操作:視圖
View
視圖分臨時視圖和全局視圖。臨時視圖時會話級別的,會話結束了視圖就沒了;全局視圖時應用級別的,只要Spark應用不停,視圖就可以跨會話使用。
可見臨時視圖和全局視圖可以叫一樣的名字,它們的內容互不干擾。因為要訪問全局視圖需要通過global_temp庫。不信你可以這樣試一下
-
Dataset<Row> group = json.select(col(“name“), col(“age“).plus(1).alias(“age1“))
-
.filter(col(“age“).gt(19))
-
.groupBy(“age1“)
-
.count();
-
-
group.createOrReplaceTempView(“people“);
-
json.createOrReplaceGlobalTempView(“people“);
-
Dataset<Row> temp = session.sql(“select * from people“);
-
Dataset<Row> global = session.sql(“select * from global_temp.people“);
-
Dataset<Row> global1 = session.newSession().sql(“select * from global_temp.people“);
-
temp.show();
-
global.show();
-
global1.show();
Dataset
我們已經跟Dataset打過不少交道了,這裡再稍晚多說一點點。實際上如果你是自己摸索而不是完全看我寫的,下面這些內容估計都已經探索出來了。
1 轉換自DF
DF是無類型的,Dataset是有類型的。如果要把無類型的轉成有類型的,就需要提供一個類型定義,就像mysql表和Java的PO一樣。
先來定義Java類:
-
public
class Person implements Serializable {
-
private String name;
-
private
long age;
-
-
public String getName() {
-
return name;
-
}
-
-
public
void setName(String name) {
-
this.name = name;
-
}
-
-
public
long getAge() {
-
return age;
-
}
-
-
public
void setAge(long age) {
-
this.age = age;
-
}
-
}
這個類必須實現序列化介面,原因在前面也說過了。
接下來把讀入json的DataFrame轉成Dataset:
之前都是使用Encoders內置的編碼器,這裡通過bean()方法生成我們自定義類的編碼器,然後傳給DF的as()方法就轉成了Dataset。
既然轉成了強類型的Dataset,那能把每一個對象拿出來嗎?給Person類增加toString方法,然後遍歷Dataset:
結果報錯了竟然:已經生成了集合,卻不能訪問元素?
報錯原因很簡單:我們類中的age是原始數據類型,但是實際數據有一個null。把long age改成Long age即可:
但是為什麼會這樣呢?!~我猜是因為as方法用的編碼器(序列化工具)和foreach用到的解碼器不匹配,spark的編碼器不要求數據符合Java編譯規則。
來自Java集合
目前我們掌握了通過讀取文件(textFile(path))、轉化其他Dataset(map/flatMap)和轉換DF來生成Dataset,如果已經有一堆數據了,也可以直接創建。
SparkSession重載了大量根據數據集生成Dataset和DataFrame的方法,可以自由選擇:
所以我們創建一個List來生成,只能是List,不能是Collection
神奇的是原本應該一樣的程式碼,執行的時候有一個報錯。這個算Java實現的BUG吧,原因參考Java中普通lambda表達式和方法引用本質上有什麼區別? – RednaxelaFX的回答 – 知乎
//www.zhihu.com/question/51491241/answer/126232275
轉自RDD
RDD 在Java環境下叫JavaRDD。它也是數據集,可以和Dataset/DataFrame互轉。這裡不說了,有興趣可以探索。