Spark和Spring整合處理離線數據

如果你比較熟悉JavaWeb應用開發,那麼對Spring框架一定不陌生,並且JavaWeb通常是基於SSM搭起的架構,主要用Java語言開發。但是開發Spark程序,Scala語言往往必不可少。

眾所周知,Scala如同Java一樣,都是運行在JVM上的,所以它具有很多Java語言的特性,同時作為函數式編程語言,又具有自己獨特的特性,實際應用中除了要結合業務場景,還要對Scala語言的特性有深入了解。

如果想像使用Java語言一樣,使用Scala來利用Spring框架特性、並結合Spark來處理離線數據,應該怎麼做呢?

本篇文章,通過詳細的示例代碼,介紹上述場景的具體實現,大家如果有類似需求,可以根據實際情況做調整。

 

1. 定義一個程序啟動入口

object Bootstrap {
  private val log = LoggerFactory.getLogger(Bootstrap.getClass)

  //指定配置文件如log4j的路徑
  val ConfFileName = "conf"
  val ConfigurePath = new File("").getAbsolutePath.substring(0, if (new File("").getAbsolutePath.lastIndexOf("lib") == -1) 0
  else new File("").getAbsolutePath.lastIndexOf("lib")) + this.ConfFileName + File.separator

  //存放實現了StatsTask的離線程序處理的類
  private val TASK_MAP = Map("WordCount" -> classOf[WordCount])

  def main(args: Array[String]): Unit = {
    //傳入一些參數,比如要運行的離線處理程序類名、處理哪些時間的數據
    if (args.length < 1) {
      log.warn("args 參數異常!!!" + args.toBuffer)
      System.exit(1)
    }
    init(args)
  }

  def init(args: Array[String]) {
    try {
      SpringUtils.init(Array[String]("applicationContext.xml"))
      initLog4j()

      val className = args(0)
      // 實例化離線處理類
      val task = SpringUtils.getBean(TASK_MAP(className))

      args.length match {
        case 3 =>
          // 處理一段時間的每天離線數據
          val dtStart = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1))
          val dtEnd = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(2))
          val days = Days.daysBetween(dtStart, dtEnd).getDays + 1
          for (i <- 0 until days) {
            val etime = dtStart.plusDays(i).toString("yyyy-MM-dd")
            task.runTask(etime)

            log.info(s"JOB --> $className 已成功處理: $etime 的數據")
          }

        case 2 =>
          // 處理指定的某天離線數據
          val etime = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1)).toString("yyyy-MM-dd")
          task.runTask(etime)
          log.info(s"JOB --> $className 已成功處理: $etime 的數據")

        case 1 =>
          // 處理前一天離線數據
          val etime = DateTime.now().minusDays(1).toString("yyyy-MM-dd")
          task.runTask(etime)
          log.info(s"JOB --> $className 已成功處理: $etime 的數據")

        case _ => println("執行失敗 args參數:" + args.toBuffer)
      }
    } catch {
      case e: Exception =>
        println("執行失敗 args參數:" + args.toBuffer)
        e.printStackTrace()
    }

    // 初始化log4j
    def initLog4j() {
      val fileName = ConfigurePath + "log4j.properties"
      if (new File(fileName).exists) {
        PropertyConfigurator.configure(fileName)
        log.info("日誌log4j已經啟動")
      }
    }
  }
}

 

2. 加載Spring配置文件工具類

 

object SpringUtils {
  private var context: ClassPathXmlApplicationContext = _

  def getBean(name: String): Any = context.getBean(name)

  def getBean[T](name: String, classObj: Class[T]): T = context.getBean(name, classObj)

  def getBean[T](_class: Class[T]): T = context.getBean(_class)

  def init(springXml: Array[String]): Unit = {
    if (springXml == null || springXml.isEmpty) {
      try
        throw new Exception("springXml 不可為空")
      catch {
        case e: Exception => e.printStackTrace()
      }
    }
    context = new ClassPathXmlApplicationContext(springXml(0))
    context.start()
  }

}

 

 

3. Spring配置文件applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="//www.springframework.org/schema/beans"
       xmlns:context="//www.springframework.org/schema/context"
       xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="//www.springframework.org/schema/beans //www.springframework.org/schema/beans/spring-beans-4.0.xsd
          //www.springframework.org/schema/context  //www.springframework.org/schema/context/spring-context-4.0.xsd">

    <!-- 配置包掃描 -->
    <context:component-scan base-package="com.bigdata.stats"/>

</beans>

 

4. 定義一個trait,作為離線程序的公共”父類”

 

trait StatsTask extends Serializable {
  //"子類"繼承StatsTask重寫該方法實現自己的業務處理邏輯 
  def runTask(etime: String)
}

 

 

5. 繼承StatsTask的離線處理類

//不要忘記添加 @Component ,否則無法利用Spring對WordCount進行實例化
@Component
class WordCount extends StatsTask {

  override def runTask(etime: String): Unit = {
    val sparkSession = SparkSession
      .builder()
      .appName("test")
      .master("local[*]")
      .getOrCreate()

    import sparkSession.implicits._

    val words = sparkSession.read.textFile("/Users/BigData/Documents/data/wordcount.txt").flatMap(_.split(" "))
      .toDF("word")

    words.createOrReplaceTempView("wordcount")

    val df = sparkSession.sql("select word, count(*) count from wordcount group by word")

    df.show()
  }
}

 

推薦文章:

Spark流式狀態管理

Spark實現推薦系統中的相似度算法

Scala中的IO操作及ArrayBuffer線程安全問題

學好Spark必須要掌握的Scala技術點


 

關注微信公眾號:大數據學習與分享,獲取更對技術乾貨