在位元組跳動,一個更好的企業級SparkSQL Server這麼做

  • 2022 年 5 月 11 日
  • 筆記

SparkSQL是Spark生態系統中非常重要的組件。面向企業級服務時,SparkSQL存在易用性較差的問題,導致難滿足日常的業務開發需求。本文將詳細解讀,如何通過構建SparkSQL服務器實現使用效率提升和使用門檻降低。

前言

Spark 組件由於其較好的容錯與故障恢復機制,在企業的長時作業中使用的非常廣泛,而SparkSQL又是使用Spark組件中最為常用的一種方式。

相比直接使用編程式的方式操作Spark的RDD或者DataFrame的API,SparkSQL可直接輸入SQL對數據進行ETL等工作的處理,極大提升了易用度。但是相比Hive等引擎來說,由於SparkSQL缺乏一個類似Hive Server2的SQL服務器,導致SparkSQL在易用性上比不上Hive。

很多時候,SparkSQL只能將自身SQL作業打包成一個Jar,進行spark-submit命令提交,因而大大降低Spark的易用性。除此之外,還可使用周邊工具,如Livy,但Livy更像一個Spark 服務器,而不是SparkSQL服務器,因此無法支持類似BI工具或者JDBC這樣的標準接口進行訪問。

雖然Spark 提供Spark Thrift Server,但是Spark Thrift Server的局限非常多,幾乎很難滿足日常的業務開發需求,具體的分析請查看:觀點|SparkSQL在企業級數倉建設的優勢

標準的JDBC接口

Java.sql包下定義了使用Java訪問存儲介質的所有接口,但是並沒有具體的實現,也就是說JavaEE裏面僅僅定義了使用Java訪問存儲介質的標準流程,具體的實現需要依靠周邊的第三方服務實現。

例如,訪問MySQL的mysql-connector-java啟動包,即基於java.sql包下定義的接口,實現了如何去連接MySQL的流程,在代碼中只需要通過如下的代碼方式:

Class.forName("com.mysql.cj.jdbc.Driver");Connection connection= DriverManager.getConnection(DB_URL,USER,PASS);//操作connection.close();

第一,初始化驅動、創建連接,第二,基於連接進行對數據的操作,例如增刪改查。可以看到在Java定義的標準接口訪問中,先創建一個connection完成存儲介質,然後完成connection後續操作。

性能問題導致單次請求實時創建connection的性能較差。因此我們往往通過維護一個存有多個connection的連接池,將connection的創建與使用分開以提升性能,因而也衍生出很多數據庫連接池,例如C3P0,DBCP等。

Hive 的JDBC實現

構建SparkSQL服務器最好的方式是用如上Java接口,且大數據生態下行業已有標杆例子,即Hive Server2。Hive Server2在遵循Java JDBC接口規範上,通過對數據操作的方式,實現了訪問Hive服務。除此之外,Hive Server2在實現上,與MySQL等關係型數據稍有不同。

首先,Hive Server2本身是提供了一系列RPC接口,具體的接口定義在org.apache.hive.service.rpc.thrift包下的TCLIService.Iface中,部分接口如下:

public TOpenSessionResp OpenSession(TOpenSessionReq req) throws org.apache.thrift.TException;
public TCloseSessionResp CloseSession(TCloseSessionReq req) throws org.apache.thrift.TException;
public TGetInfoResp GetInfo(TGetInfoReq req) throws org.apache.thrift.TException;
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException;
public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws org.apache.thrift.TException;
public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws org.apache.thrift.TException;
public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws org.apache.thrift.TException;
public TGetTablesResp GetTables(TGetTablesReq req) throws org.apache.thrift.TException;
public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws org.apache.thrift.TException;
public TGetColumnsResp GetColumns(TGetColumnsReq req) throws org.apache.thrift.TException;

也就是說,Hive Server2的每一個請求都是獨立的,並且是通過參數的方式將操作和認證信息傳遞。Hive 提供了一個JDBC的驅動實現,通過如下的依賴便可引入:

<dependency>    <groupId>org.apache.hive</groupId>    <artifactId>hive-jdbc</artifactId>    <version>version/version></dependency>

在HiveConnection類中實現了將Java中定義的SQL訪問接口轉化為調用Hive Server2的RPC接口的實現,並且擴充了一部分Java定義中缺乏的能力,例如實時的日誌獲取。但是使用該能力時,需要將對應的實現類轉換為Hive的實現類,例如:

HiveStatement hiveStatement = (HiveStatement) connection.createStatement();List<String> logs = hiveStatement.getQueryLog();

Log獲取也需調用FetchResult接口,通過不同的參數來區分獲取Log信息還是獲取內容信息,因此,Hive JDBC封裝的調用Hive Server2 RPC接口流程是:

 

圖片

 

如果該流程觸發獲取MetaData、獲取Functions等操作,則會調用其他接口,其中身份信息即token,是用THandleIdentifier類進行封裝。在OpenSession時,由Hive Server2生成並且返回,後續所有接口都會附帶傳遞這個信息,此信息是一次Connection連接的唯一標誌。

但是,Hive Server2在FetchResults方法中存在bug。由於Hive Server2沒有很好處理hasMoreRows字段,導致Hive JDBC 客戶端並未通過hasMoreRows字段去判斷是否還有下一頁,而是通過返回的List是否為空來判斷。因此,相比Mysql Driver等驅動,Hive會多發起一次請求,直到返回List為空則停止獲取下一頁,對應的客戶端的JDBC代碼是:

ResultSet rs = hiveStatement.executeQuery(sql);while (rs.next()) {    // }

即Hive JDBC實現next方法是通過返回的List是否為空來退出while循環。

構建SparkSQL服務器

介紹了JDBC接口知識與Hive的JDBC知識後,如果要構建一個SparkSQL服務器,那麼這個服務器需要有以下幾個特點:

  • 支持JDBC接口,即通過Java 的JDBC標準進行訪問,可以較好與周邊生態進行集成且降低使用門檻。

  • 兼容Hive協議,如果要支持JDBC接口,那麼需要提供SparkSQL的JDBC Driver。目前,大數據領域Hive Server2提供的Hive-JDBC-Driver已經被廣泛使用,從遷移成本來說最好的方式就是保持Hive的使用方式不變,只需要換個端口就行,也就是可以通過Hive的JDBC Driver直接訪問SparkSQL服務器。

  • 支持多租戶,以及類似用戶名+密碼和Kerberos等常見的用戶認證能力。

  • 支持跨隊列提交,同時支持在JDBC的參數裏面配置Spark的相關作業參數,例如Driver Memory,Execute Number等。

這裡還有一個問題需要考慮,即用戶通過SparkSQL服務器提交的是一段SQL代碼,而SparkSQL在執行時需要向Yarn提交Jar。那麼,如何實現SQL到Jar提交轉換?

一個最簡單的方式是,用戶每提交一個SQL就執行一次spark-submit命令,將結果保存再緩存,提供給客戶端。還有更好方式,即提交一個常駐的Spark 作業,這個作業是一個常駐任務,作業會開啟一個端口,用來接收用戶的SQL進行執行,並且保存。

但是為了解決類似Spark Thrift Server的問題,作業需要和用戶進行綁定,而不是隨着Spark的組件啟動進行綁定,即作業的提交以及接收哪個用戶的請求,均來自於用戶的行為觸發。

 

圖片

有了這樣幾個大的方向後,便可以開始開發SparkSQL服務器。首先需要實現TCLIService.Iface下的所有接口,下面用代碼+注釋的方式來講述這些Thrift接口的含義,以及如果實現一個SparkSQL服務器,需要在這些接口做什麼內容:

public class SparkSQLThriftServer implements TCLIService.Iface {
    @Override
    public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {
        //Hive JDBC Driver在執行創建Connection的時候會調用此接口,在這裡維護一個用戶與Spark 作業的對應關係。
        //來判斷是需要復用一個已經存在的Spark作業,還是全新執行一次spark-submt。
        //用戶與是否需要spark-submit的關聯關係均在這裡實現。
        //同時需要生成THandleIdentifier對象,並且和用戶身份進行關聯,後續其他方法調用均需要使用這個對象關聯出用戶的信息。
        return null;
    }

    @Override
    public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException {
        //客戶端調用connection.close()方法後會進入到這裡,在這裡進行用戶狀態的清除,同時需要基於用戶的情況判斷是否需要停止用來執行該用戶SQL的Spark 作業引擎。
        return null;
    }

    @Override
    public TGetInfoResp GetInfo(TGetInfoReq req) throws TException {
        //獲取服務器的元數據信息,例如使用BI工具,在命令會列出所連接的服務的版本號等信息,均由此方法提供。
        return null;
    }

    @Override
    public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
        //執行SQL任務,這裡傳遞過來的是用戶在客戶端提交的SQL作業,接收到用戶SQL後,將該SQL發送給常駐的Spark作業,這個常駐的作業在OpenSession的時候已經確定。
        return null;
    }

    @Override
    public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException {
        //獲取數據庫支持的類型信息,使用BI工具,例如beeline的時候會調用到這裡。
        return null;
    }

    @Override
    public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException {
        //獲取Catalog,使用BI工具,例如beeline的時候會調用到這裡。
        return null;
    }


    @Override
    public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
        //返回查詢結果,基於THandleIdentifier對象查詢到用戶的SQL執行的情況,將請求轉發至常駐的Spark 實例,獲取結果。
        //參數中通過TFetchResultsReq的getFetchType來區分是獲取日誌數據還是查詢結果數據,getFetchType == 1為獲取Log,為0是查詢數據查詢結果。
        return null;
    }


}

 

我們採用復用當前生態的方式,來實現兼容Hive JDBC Driver的服務器。有了上面的Thrift接口實現後,則需要啟動一個Thrift服務,例如:

TThreadPoolServer.Args thriftArgs = new TThreadPoolServer.Args(serverTransport)
        .processorFactory(new TProcessorFactory(this))
        .transportFactory(new TSaslServerTransport.Factory())
        .protocolFactory(new TBinaryProtocol.Factory())
        .inputProtocolFactory(
                new TBinaryProtocol.Factory(
                        true,
                        true,
                        10000,
                        10000

                )
        )
        .requestTimeout(1000L)
        .requestTimeoutUnit(TimeUnit.MILLISECONDS)
        .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
        .executorService(executorService);
thriftArgs
        .executorService(
                new ThreadPoolExecutor(
                        config.getMinWorkerThreads(),
                        config.getMaxWorkerThreads(),
                        config.getKeepAliveTime(),
        TimeUnit.SECONDS, new SynchronousQueue<>()));

TThreadPoolServer server = new TThreadPoolServer(thriftArgs);
server.serve();

 

至此便開發了一個支持Hive JDBC Driver訪問的服務器,並且在這個服務器的方法中,實現了對Spark 作業的管理。後續,還需要開發預設Spark Jar,Jar同樣實現了如上接口,只是該作業的實現是實際執行用戶的SQL。

經過前面的流程,已經完成一個可以工作SparkSQL服務器開發,擁有接收用戶請求,執行SQL,並且返回結果的能力。但如何做的更加細緻?例如,如何實現跨隊列的提交、如何實現用戶細粒度的資源管理、如何維護多個Spark 作業的連接池,我們接下來會講到。

 

圖片

 

由於對於Spark作業在Yarn上的提交,運行,停止均由SparkSQL服務器管理,對用戶是不可見的,用戶只需要編寫標準的JDBC代碼即可,因此可以基於用戶的參數信息來匹配合適的引擎去執行,同時還可以限制一個Spark 常駐作業的任務個數,實現更加靈活的SparkSQL作業的管理,同時也可以實現類似C3P0連接池的思想,維護一個用戶信息到Spark常駐作業的關聯池。

SparkSQL服務器的HA

Hive Server2在啟動的時候會將自己的服務器信息寫入Zookeeper中,結構體如下所示:

[zk: localhost:2181(CONNECTED) 1] ls /hiveserver2\[serverUri=127.0.01:10000;version=3.1.2;sequence=0000000000]

當連接HA模式下的服務器的時候,Hive JDBC Driver的URL需要切換成zookeeper的地址,Hive JDBC Driver會從多個地址中隨機選擇一個,作為該Connection的地址,在整個Connection中均會使用該地址。

因此對於我們實現的SparkSQL服務器,只需要在服務器啟動的時候,保持與Hive一致的數據格式,將自己的服務器的地址信息寫入到Zookeeper中即可,便可通過標準的zk地址進行訪問,例如:

./bin/beeline -u  "jdbc:hive2://127.0.01/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=自定義的節點;auth=LDAP"  -n 用戶名 -p密碼

由於服務器的選擇基於Connection級別的,也就是在Connection被生成新的之前,整個服務器的地址是不會發生變化的,在發生錯誤的時候服務端可以進行重試,進行地址的切換,因此HA的力度是在Connection級別而非請求級別。

對接生態工具

完成以上開發之後,即可實現在大部分的場景下,使用標準的Hive驅動只需要切換一個端口號。特別提到Hue工具,由於和Hive的集成並未使用標準的JDBC接口,而是直接分開調用的Hive Server2的Thrift接口,也就是Hue自行維護來如何訪問Thrift的接口的順序問題。

可以發現在這樣的情況會有一個問題就是對於Hue來說,並沒有Connection的概念,正常的SparkSQL在JDBC的交互方式下處理流程是:

 

圖片

 

由於Hue沒有Connection概念,因此Hue的請求並不會先到OpenSession,而是直接發起ExecuteStatement。由於沒有上下文信息,正常流程下ExecuteStatement處接收到Hue的請求會發現該請求為非法,所以OpenSession不能作為連接的起點,而是需要在每一個Thrift接口處實現OpenSession的能力,以此作為上下文初始化。

尾聲

SparkSQL在企業中的使用比重越來越大,而有一個更好用的SQL服務器,則會大大提升使用效率和降低使用門檻。目前,SparkSQL在服務器這方面的能力顯然不如Hive Server2提供的更加標準,所以各個企業均可基於自身情況,選擇是否需要開發一個合適於自身的SparkSQL服務器。

本文所提到的相關能力已通過火山引擎EMR產品向外部企業開放。結合位元組跳動內部以及外部客戶的需求情況,火山引擎EMR產品的Ksana for SparkSQL提供一個生產可用的SparkSQL服務器,並且在Spark 性能方面也做了較大的優化,本文主要圍繞技術實現的角度來闡述如何實現一個SparkSQL服務,後續會有更多文章講述其他相關的優化。

產品介紹

火山引擎 E-MapReduce

支持構建開源Hadoop生態的企業級大數據分析系統,完全兼容開源,提供 Hadoop、Spark、Hive、Flink集成和管理,幫助用戶輕鬆完成企業大數據平台的構建,降低運維門檻,快速形成大數據分析能力。

 

更多技術交流、求職機會、試用福利,歡迎關注位元組跳動數據平台微信公眾號,回復【1】進入官方交流群