Spark系列–OutputFormat 詳解

  • 2019 年 10 月 30 日
  • 筆記

前言

本文主要內容

  1. 什麼是OutputFormat及其運行機制?
  2. 如何自定義自己的OutputFormat?
  3. 實戰自定義mysql OutputFormat。

一丶什麼是OutputFormat?

定義了 spark 的輸出規則的類。這也許會讓你想到 Hadoop Mapreduce 的 OutputFormat,沒錯,其實他們是一個東西,嗯,完全一樣。Spark 本身只是一個計算框架,其輸入和輸出都是依賴於 Hadoop 的 OutputFormat,但是因為 Spark 本身自帶 Hadoop 相關 Jar 包,所以不需要我們額外考慮這些東西,下面我們以saveAsTextFile源碼來驗證我們的結論

 def saveAsTextFile(path: String): Unit = withScope {      val nullWritableClassTag = implicitly[ClassTag[NullWritable]]      val textClassTag = implicitly[ClassTag[Text]]      val r = this.mapPartitions { iter =>        val text = new Text()        iter.map { x =>          text.set(x.toString)          (NullWritable.get(), text)        }      }      //最後調用的 saveAsHadoopFile()  並且泛型是 org.apache.hadoop.mapred.TextOutputFormat,      //是屬於 hadoop 包下的一個outputformat,以此簡單來驗證我們的結論      RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)        .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)    }

二丶OutputFormat運行機制?

我們知道 Spark 是分佈式計算框架,其計算是一個個 Executor 為單元進行的,當運行到 類似於 saveAsTextFile等輸出型算子時,會根據其定義的 Outputformat 規則進行輸出,在每個Executor 單元內的每個task有且只有一個 Outputformat 實例

三丶自定義 OutputFormat 解析

首先我們來看一下 OutputFormat 接口

public interface OutputFormat<K, V> {      /**     * 根據給予的參數返回一個 RecordWriter 對象     *     * @param ignored 基本沒什麼用     * @param job 可以用來獲取各種配置,定製特別的 RecordWriter     * @param name 一個唯一的名字,比如:part-0001     * @param progress mechanism for reporting progress while writing to file.     */    RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,                                       String name, Progressable progress)    throws IOException;      /**     * 用來做輸出前的各種檢查     */    void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;    //獲取一個 OutputCommitter,用來保證輸出的正確執行    public abstract  OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;  }

checkOutputSpecs很好理解,用來做輸出前的檢查,比如 Spark 會對輸出路徑做檢查,如果存在就拋出異常,那麼接下來我們先理解下 RecordWriter 和 OutputCommitter

  • RecordWriter
public abstract class RecordWriter<K, V> {    /**     * outputformat 是針對於 kv格式的RDD的,     * Rdd數據的每條記錄都會調用一次 write 方法 用來寫入數據     */    public abstract void write(K key, V value                               ) throws IOException, InterruptedException;      /**     * 在數據寫完之後,會進行調用,一般執行一些 IO 的 close 操作     */    public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException;  }

這裡我們可以發現,如果你不是 KV 格式的 Rdd,那麼能調用的只有有限的幾個輸出型算子,比如saveAsTextFile,其實底層是給你加格式化成了 kv 格式 Rdd 的,其 key 為 NullWritable,這塊一般是我們自定義的重點。

  • OutputCommitter
package com.inveno.data.analysis.user.statistical;  import java.io.IOException;  import org.apache.hadoop.classification.InterfaceAudience;  import org.apache.hadoop.classification.InterfaceStability;    public abstract class OutputCommitter {      /**       * 每個job執行之前都會調用一次或者多次,用來進行一些初始化操作       */      public abstract void setupJob(JobContext jobContext) throws IOException;        /**       * 每個job執行之後都會調用一次或者多次,用來進行一些初始化操作       */      @Deprecated      public void cleanupJob(JobContext jobContext) throws IOException {      }        /**       * 每個job執行完成都會調用一次       */      public void commitJob(JobContext jobContext) throws IOException {          cleanupJob(jobContext);      }          /**       * 每個job中斷執行會調用一次或者多次       */      public void abortJob(JobContext jobContext, JobStatus.State state)              throws IOException {          cleanupJob(jobContext);      }        /**       * 每個 task 執行之前都會調用一次或者多次,用來進行一些初始化操作       */      public abstract void setupTask(TaskAttemptContext taskContext)              throws IOException;        /**       * 需要輸出到 hdfs 上的 task 用來檢測是否有輸出需要提交       */      public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)              throws IOException;        /**       * 每個 needsTaskCommit 為 true 的 task 執行完成都會調用一次或者多次       */      public abstract void commitTask(TaskAttemptContext taskContext)              throws IOException;        /**       * task 中斷會被調用一次或多次       */      public abstract void abortTask(TaskAttemptContext taskContext)              throws IOException;        /**       * 是否支持輸出恢復       */      public boolean isRecoverySupported() {          return false;      }        /**       * 恢復task輸出       */      public void recoverTask(TaskAttemptContext taskContext)              throws IOException {      }  }

其中代碼注釋說的調用多次,一般都是因為重試機制導致的,一般只會調用一次,這個我們一般使用系統自帶的實現類,然後在各個生命周期添加一些自定義操作。

四丶實戰—定義一個自己的 MysqlOutputFormat

  1. 每當你想自定義一個東西,第一步應該想的是:我有這個需求,別人有沒有?我是不是在重複造輪子?別人的輪子適合我嗎?我可以做的更好嗎?
  2. 有了上面的思考,我們果斷在源碼包裏面找到了一個叫做 DBOutputFormat的類,輪子果然是有的,那麼好不好用呢?能不能優化一下呢?
  3. ok,廢話不多說了,我們來看看今天我們自定義的 MysqlOutputFormat,因為要用在 Spark 上 所以我們使用的是 Scala 語言
abstract class MysqlOutputFormat[K, V]() extends OutputFormat[K, V] {      val logger = LoggerFactory.getLogger(getClass)    //直接返回一個 MysqlWriter 對象    override def getRecordWriter(taskAttemptContext: TaskAttemptContext): RecordWriter[K, V] = {      new MysqlWriter[K, V](getDBFlag(), getValueConvert(), taskAttemptContext)    }    //空實現,這裡可以根據你的需求實現,比如刪除一些老舊數據    override def checkOutputSpecs(jobContext: JobContext): Unit = {    }  //因為我們數據讀入的KV格式,這裡定義了一個 SQLValueConvert trait,來讓使用者自定義輸入規則    def getValueConvert(): SQLValueConvert[K, V]  //用於給 mysqlwriter 獲取mysql相關參數的 flag    def getDBFlag(): String      //我們這裡直接使用系統自帶的就ok了,可以根據你的需求來做相關修改    override def getOutputCommitter(taskAttemptContext: TaskAttemptContext): OutputCommitter = {      new FileOutputCommitter(null, taskAttemptContext)    }  }

實現比較簡單,值得注意的是,在 Spark 中 OutputFormat 是通過反射生產的實例,所以需要提供一個無參的構造方法。那麼接下來我們看看最重要的部分 MysqlWriter

class MysqlWrite[K, V](db_flag: String, converter: SQLValueConvert[K, V], context: TaskAttemptContext) extends RecordWriter[K, V] {    val logger = LoggerFactory.getLogger(getClass)    //加載resource mysql配置文件    val conf: Configuration = context.getConfiguration    conf.addResource("mysql.xml")    //根據傳入的 flag 讀取resource mysql 相應的配置文件    val table: String = conf.get(String.format(JDBCManager.JDBC_TABLE_NAME, db_flag))//table name    private val batch_size = conf.get(String.format(JDBCManager.BATCH, db_flag)).toInt// batch size      var count = 0      var committerStatement: PreparedStatement = _    var conn: Connection = _    //執行批量寫入 mysql    def commit(): Unit = {      if (conn == null || committerStatement == null) {        return      }      try {        committerStatement.executeBatch()        conn.commit()          committerStatement.clearBatch()        count = 0      } catch {        case e: Exception =>          //出錯回滾 並拋出異常          conn.rollback()          logger.error("在writer中寫數據出現異常", e.printStackTrace())          throw e      }    }    //相關資源釋放    override def close(taskAttemptContext: TaskAttemptContext): Unit = {      try {      //提交剩餘的數據        commit()      } catch {        case e: Throwable =>          throw new SQLException()      } finally {        if (committerStatement != null) {          committerStatement.close()        }        if (conn != null) {          conn.close()        }      }    }        override def write(key: K, value: V): Unit = {      if (key == null || value == null) {          return       }      try {        //根據自定義規則 將KV轉換成 array(),        val values = converter.convert(key, value)          //創建數據庫鏈接        if (conn == null) {          conn = JDBCManager.getConnection(conf, db_flag)          conn.setAutoCommit(false)        }        //創建Statement        if (committerStatement == null) {          committerStatement = conn.prepareStatement(          //"INSERT INTO %s VALUES(%s)" 創建 sql 語句            MysqlOperation.insertByParameter(table, values.length))        }      //添加參數        for (i <- values.indices) {          committerStatement.setObject(i + 1, values.apply(i))        }        committerStatement.addBatch()          count = count + 1        //大於batch_size進行提交        if (count >= batch_size) {          commit()        }      } catch {        case e: Throwable =>          println("在writer中寫數據出現異常", e.printStackTrace())          throw new Exception(e)      }    }

上面的代碼都比較簡單,這裡讀者可以思考一下,數據庫的連接是否可以放到 setupTask?提交任務是否可以放到 commitTask ? 這邊 mysql.xml 相關配置就不貼了,項目實際應用過程我們一般都需要將配置屬性寫到額外的文件,方便管理和維護。

五丶額外的思考

能否自定義一個outputformat來實現控制spark 文件的輸出數量呢?這裡主要考慮的多個task同時寫入一個文件,必然涉及到文件的追加,而我們知道 hdfs雖然支持文件的追加,但是性能並不是很好,至於效率到底怎麼樣?筆者也沒驗證過。。。如果你有好的想法,歡迎留言。。。一起討論!!!