Spark SQL讀資料庫時不支援某些數據類型的問題
- 2019 年 12 月 5 日
- 筆記
在大數據平台中,經常需要做數據的ETL,從傳統關係型資料庫RDBMS中抽取數據到HDFS中。之前開發數據湖新版本時使用Spark SQL來完成ETL的工作,但是遇到了 Spark SQL 不支援某些數據類型(比如ORACLE中的Timestamp with local Timezone)的問題。
一、系統環境
- Spark 版本:2.1.0.cloudera1
- JDK 版本:Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131
- ORACLE JDBC driver 版本:ojdbc7.jar
- Scala 版本:2.11.8
二、Spark SQL讀資料庫表遇到的不支援某些數據類型
Spark SQL 讀取傳統的關係型資料庫同樣需要用到 JDBC,畢竟這是提供的訪問資料庫官方 API。Spark要讀取資料庫需要解決兩個問題:
- 分散式讀取;
- 原始表數據到DataFrame的映射。
2.1 業務程式碼
public class Config { // spark-jdbc parameter names public static String JDBC_PARA_URL = "url"; public static String JDBC_PARA_USER = "user"; public static String JDBC_PARA_PASSWORD = "password"; public static String JDBC_PARA_DRIVER = "driver"; public static String JDBC_PARA_TABLE = "dbtable"; public static String JDBC_PARA_FETCH_SIZE = "fetchsize"; }
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql._ // 主類 object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().master("yarn").appName("test")getOrCreate() val sqlContext = sparkSession.sqlContext val sc = sparkSession.sparkContext val partitionNum = 16 val fetchSize = 1000 val jdbcUrl = "..." val userName = "..." val schema_table = "..." val password = "..." val jdbcDriver = "oracle.jdbc.driver.OracleDriver" // 注意需要將oracle jdbc driver jar放置在spark lib jars目錄下,或者spark2-submit提交spark application時添加--jars參數 val jdbcDF = sqlContext.read.format("jdbc").options( Map(Config.JDBC_PARA_URL -> jdbcUrl, Config.JDBC_PARA_USER -> userName, Config.JDBC_PARA_TABLE -> schema_table, Config.JDBC_PARA_PASSWORD -> password, Config.JDBC_PARA_DRIVER -> jdbcDriver, Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load() val rdd = jdbcDF.rdd rdd.count() ...... }
2.2 部分數據類型不支援
比如ORACLE中的Timestamp with local Timezone 和 FLOAT(126)。
三、解決方法:自定義JdbcDialects
3.1 什麼是JdbcDialects ?
Spark SQL 中的 org.apache.spark.sql.jdbc package 中有個類 JdbcDialects.scala,該類定義了Spark DataType 和 SQLType 之間的映射關係,分析該類的源碼可知,該類是一個抽象類,包含以下幾個方法:
- def canHandle(url : String):判斷該JdbcDialect 實例是否能夠處理該jdbc url;
- getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):輸入資料庫中的SQLType,得到對應的Spark DataType的mapping關係;
- getJDBCType(dt: DataType):輸入Spark 的DataType,得到對應的資料庫的SQLType;
- quoteIdentifier(colName: String):引用標識符,用來放置某些欄位名用了資料庫的保留字(有些用戶會使用資料庫的保留字作為列名);
- 其他……。
該類還有一個伴生對象,其中包含3個方法:
- get(url: String):根據database的url獲取JdbcDialect 對象;
- unregisterDialect(dialect: JdbcDialect):將已註冊的JdbcDialect 註銷;
- registerDialect(dialect: JdbcDialect):註冊一個JdbcDialect。
3.2 解決步驟
- 使用get(url: String)方法獲取當前的 JdbcDialect 對象;
- 將當前的 JdbcDialect 對象 unregistered 掉;
- new 一個 JdbcDialect 對象,並重寫方法(主要是getCatalystType()方法,因為其定義了資料庫 SQLType 到 Spark DataType 的映射關係),修改映射關係,將不支援的 SQLType 以其他的支援的數據類型返回比如StringType,這樣就能夠解決問題了;
- register新創建的 JdbcDialect 對象。
3.3 解決方案的業務程式碼
object SaicSparkJdbcDialect { def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={ val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect]) // 將當前的 JdbcDialect 對象unregistered掉 val dialect = JdbcDialects JdbcDialects.unregisterDialect(dialect.get(jdbcUrl)) if (dbType.equals("ORACLE")) { val OracleDialect = new JdbcDialect { // 只能處理ORACLE資料庫 override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") // 修改資料庫 SQLType 到 Spark DataType 的映射關係(從資料庫讀取到Spark中) override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType==Types.TIMESTAMP || sqlType== -101 || sqlType== -102) { // 將不支援的 Timestamp with local Timezone 以TimestampType形式返回 Some(TimestampType) } else if (sqlType == Types.BLOB) { Some(BinaryType) } else { Some(StringType) } } // 該方法定義的是資料庫Spark DataType 到 SQLType 的映射關係,此處不需要做修改 override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC)) case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC)) case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) case _ => None } override def quoteIdentifier(colName: String): String = { colName } } // register新創建的 JdbcDialect 對象 JdbcDialects.registerDialect(OracleDialect) }
本文來自:https://www.jianshu.com/p/20b82891aac9