Spark SQL讀資料庫時不支援某些數據類型的問題

  • 2019 年 12 月 5 日
  • 筆記

在大數據平台中,經常需要做數據的ETL,從傳統關係型資料庫RDBMS中抽取數據到HDFS中。之前開發數據湖新版本時使用Spark SQL來完成ETL的工作,但是遇到了 Spark SQL 不支援某些數據類型(比如ORACLE中的Timestamp with local Timezone)的問題。

一、系統環境

  • Spark 版本:2.1.0.cloudera1
  • JDK 版本:Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131
  • ORACLE JDBC driver 版本:ojdbc7.jar
  • Scala 版本:2.11.8

二、Spark SQL讀資料庫表遇到的不支援某些數據類型

Spark SQL 讀取傳統的關係型資料庫同樣需要用到 JDBC,畢竟這是提供的訪問資料庫官方 API。Spark要讀取資料庫需要解決兩個問題:

  • 分散式讀取;
  • 原始表數據到DataFrame的映射。

2.1 業務程式碼

public class Config {    // spark-jdbc parameter names    public static String JDBC_PARA_URL = "url";    public static String JDBC_PARA_USER = "user";    public static String JDBC_PARA_PASSWORD = "password";    public static String JDBC_PARA_DRIVER = "driver";    public static String JDBC_PARA_TABLE = "dbtable";    public static String JDBC_PARA_FETCH_SIZE = "fetchsize";  }  
import org.apache.spark.SparkContext  import org.apache.spark.rdd.RDD  import org.apache.spark.sql._    // 主類  object Main {      def main(args: Array[String]): Unit = {      val sparkSession = SparkSession.builder().master("yarn").appName("test")getOrCreate()      val sqlContext = sparkSession.sqlContext      val sc = sparkSession.sparkContext      val partitionNum = 16      val fetchSize = 1000      val jdbcUrl = "..."      val userName = "..."      val schema_table = "..."      val password = "..."      val jdbcDriver = "oracle.jdbc.driver.OracleDriver"      // 注意需要將oracle jdbc driver jar放置在spark lib jars目錄下,或者spark2-submit提交spark application時添加--jars參數      val jdbcDF = sqlContext.read.format("jdbc").options(            Map(Config.JDBC_PARA_URL -> jdbcUrl,              Config.JDBC_PARA_USER -> userName,              Config.JDBC_PARA_TABLE -> schema_table,              Config.JDBC_PARA_PASSWORD -> password,              Config.JDBC_PARA_DRIVER -> jdbcDriver,              Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()      val rdd = jdbcDF.rdd      rdd.count()      ......  }  

2.2 部分數據類型不支援

比如ORACLE中的Timestamp with local TimezoneFLOAT(126)


三、解決方法:自定義JdbcDialects

3.1 什麼是JdbcDialects ?

Spark SQL 中的 org.apache.spark.sql.jdbc package 中有個類 JdbcDialects.scala,該類定義了Spark DataType 和 SQLType 之間的映射關係,分析該類的源碼可知,該類是一個抽象類,包含以下幾個方法:

  • def canHandle(url : String):判斷該JdbcDialect 實例是否能夠處理該jdbc url;
  • getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):輸入資料庫中的SQLType,得到對應的Spark DataType的mapping關係;
  • getJDBCType(dt: DataType):輸入Spark 的DataType,得到對應的資料庫的SQLType;
  • quoteIdentifier(colName: String):引用標識符,用來放置某些欄位名用了資料庫的保留字(有些用戶會使用資料庫的保留字作為列名);
  • 其他……。

該類還有一個伴生對象,其中包含3個方法:

  • get(url: String):根據database的url獲取JdbcDialect 對象;
  • unregisterDialect(dialect: JdbcDialect):將已註冊的JdbcDialect 註銷;
  • registerDialect(dialect: JdbcDialect):註冊一個JdbcDialect。

3.2 解決步驟

  1. 使用get(url: String)方法獲取當前的 JdbcDialect 對象;
  2. 將當前的 JdbcDialect 對象 unregistered 掉;
  3. new 一個 JdbcDialect 對象,並重寫方法(主要是getCatalystType()方法,因為其定義了資料庫 SQLType 到 Spark DataType 的映射關係),修改映射關係,將不支援的 SQLType 以其他的支援的數據類型返回比如StringType,這樣就能夠解決問題了;
  4. register新創建的 JdbcDialect 對象

3.3 解決方案的業務程式碼

object SaicSparkJdbcDialect {        def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={        val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect])        // 將當前的 JdbcDialect 對象unregistered掉      val dialect = JdbcDialects      JdbcDialects.unregisterDialect(dialect.get(jdbcUrl))        if (dbType.equals("ORACLE")) {        val OracleDialect = new JdbcDialect {            // 只能處理ORACLE資料庫            override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")            // 修改資料庫 SQLType 到 Spark DataType 的映射關係(從資料庫讀取到Spark中)            override def getCatalystType(sqlType: Int, typeName: String, size: Int,                                         md: MetadataBuilder): Option[DataType] = {              if (sqlType==Types.TIMESTAMP || sqlType== -101 || sqlType== -102) {                // 將不支援的 Timestamp with local Timezone 以TimestampType形式返回                Some(TimestampType)              } else if (sqlType == Types.BLOB) {                Some(BinaryType)              } else {                Some(StringType)              }            }            // 該方法定義的是資料庫Spark DataType 到 SQLType 的映射關係,此處不需要做修改            override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {              case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))              case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))              case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))              case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))              case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))              case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))              case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))              case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))              case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))              case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))              case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))              case _ => None            }            override def quoteIdentifier(colName: String): String = {              colName            }          }          // register新創建的 JdbcDialect 對象          JdbcDialects.registerDialect(OracleDialect)        }  

本文來自:https://www.jianshu.com/p/20b82891aac9