restapi(8)- restapi-sql:用戶自主的服務

  • 2019 年 10 月 28 日
  • 筆記

  學習函數式編程初衷是看到自己熟悉的oop程式語言和sql資料庫在現代商業社會中前景暗淡,準備完全放棄windows技術棧轉到分散式大數據技術領域的。但是在現實中理想總是不如人意,本來想在一個規模較小的公司展展拳腳,以為小公司會少點歷史包袱,有利於全面技術改造。但現實是:即使是小公司,一旦有個成熟的產品,那麼進行全面的技術更新基本上是不可能的了,因為公司要生存,開發人員很難新舊技術之間隨時切換。除非有狂熱的熱情,員工怠慢甚至抵制情緒不容易解決。只能採取逐步切換方式:保留原有產品的後期維護不動,新產品開發用一些新的技術。在我們這裡的情況就是:以前一堆c#、sqlserver的東西必須保留,新的功能比如大數據、ai、識別等必須用新的手段如scala、python、dart、akka、kafka、cassandra、mongodb來開發。好了,新舊兩個開發平台之間的軟體系統對接又變成了一個問題。

   現在我們這裡有個需求:把在linux-ubuntu akka-cluster集群環境里mongodb里數據處理的結果傳給windows server下SQLServer里。這是一種典型的異系統集成場景。我的解決方案是通過一個restapi服務作為兩個系統的數據橋樑,這個restapi的最基本要求是:

1、支援任何作業系統前端:這個沒什麼問題,在http層上通過json交換數據

2、能讀寫mongodb:在前面討論的restapi-mongo已經實現了這一功能

3、能讀寫windows server環境下的sqlserver:這個是本篇討論的主題

4、用戶能夠比較方便的對平台資料庫進行操作,最好免去前後雙方每類操作都需要進行協定model這一過程,也就是能達到用戶隨意調用服務

前面曾經實現了一個jdbc-engine項目,基於scalikejdbc,不過只示範了slick-h2相關的功能。現在需要sqlserver-jdbc驅動,然後試試能不能在JVM里驅動windows下的sqlserver。maven里找不到sqlserver的驅動,但從微軟官網可以下載mssql-jdbc-7.0.0.jre8.jar。這是個jar,在sbt里稱作unmanagedjar,不能擺在build.sbt的dependency里。這個需要擺在項目根目錄下的lib目錄下即可(也可以在放在build.sbt里unmanagedBase :=?? 指定的路徑下)。然後是資料庫連接,下面是可以使用sqlserver的application.conf配置文件內容:

# JDBC settings  prod {    db {      h2 {        driver = "org.h2.Driver"        url = "jdbc:h2:tcp://localhost/~/slickdemo"        user = ""        password = ""        poolFactoryName = "hikaricp"        numThreads = 10        maxConnections = 12        minConnections = 4        keepAliveConnection = true      }      mysql {        driver = "com.mysql.cj.jdbc.Driver"        url = "jdbc:mysql://localhost:3306/testdb"        user = "root"        password = "123"        poolFactoryName = "hikaricp"        numThreads = 10        maxConnections = 12        minConnections = 4        keepAliveConnection = true      }      postgres {        driver = "org.postgresql.Driver"        url = "jdbc:postgresql://localhost:5432/testdb"        user = "root"        password = "123"        poolFactoryName = "hikaricp"        numThreads = 10        maxConnections = 12        minConnections = 4        keepAliveConnection = true      }      mssql {        driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"        url = "jdbc:sqlserver://192.168.11.164:1433;integratedSecurity=false;Connect Timeout=3000"        user = "sa"        password = "Tiger2020"        poolFactoryName = "hikaricp"        numThreads = 10        maxConnections = 12        minConnections = 4        keepAliveConnection = true        connectionTimeout = 3000      }      termtxns {        driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"        url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=TERMTXNS;integratedSecurity=false;Connect Timeout=3000"        user = "sa"        password = "Tiger2020"        poolFactoryName = "hikaricp"        numThreads = 10        maxConnections = 12        minConnections = 4        keepAliveConnection = true        connectionTimeout = 3000      }      crmdb {        driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"        url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=CRMDB;integratedSecurity=false;Connect Timeout=3000"        user = "sa"        password = "Tiger2020"        poolFactoryName = "hikaricp"        numThreads = 10        maxConnections = 12        minConnections = 4        keepAliveConnection = true        connectionTimeout = 3000      }    }    # scallikejdbc Global settings    scalikejdbc.global.loggingSQLAndTime.enabled = true    scalikejdbc.global.loggingSQLAndTime.logLevel = info    scalikejdbc.global.loggingSQLAndTime.warningEnabled = true    scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000    scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn    scalikejdbc.global.loggingSQLAndTime.singleLineMode = false    scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false    scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10  }

這個文件里的mssql,termtxns,crmdb段落都是給sqlserver的,它們都使用hikaricp執行緒池管理。

在jdbc-engine里啟動資料庫方式如下:

  ConfigDBsWithEnv("prod").setup('termtxns)    ConfigDBsWithEnv("prod").setup('crmdb)    ConfigDBsWithEnv("prod").loadGlobalSettings()

這段打開了在配置文件中用termtxns,crmdb註明的資料庫。

下面是SqlHttpServer.scala的程式碼:

package com.datatech.rest.sql  import akka.http.scaladsl.Http  import akka.http.scaladsl.server.Directives._  import pdi.jwt._  import AuthBase._  import MockUserAuthService._  import com.datatech.sdp.jdbc.config.ConfigDBsWithEnv    import akka.actor.ActorSystem  import akka.stream.ActorMaterializer    import Repo._  import SqlRoute._    object SqlHttpServer extends App {      implicit val httpSys = ActorSystem("sql-http-sys")    implicit val httpMat = ActorMaterializer()    implicit val httpEC = httpSys.dispatcher      ConfigDBsWithEnv("prod").setup('termtxns)    ConfigDBsWithEnv("prod").setup('crmdb)    ConfigDBsWithEnv("prod").loadGlobalSettings()      implicit val authenticator = new AuthBase()      .withAlgorithm(JwtAlgorithm.HS256)      .withSecretKey("OpenSesame")      .withUserFunc(getValidUser)      val route =      path("auth") {        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>          post { complete(authenticator.issueJwt(userinfo))}        }      } ~        pathPrefix("api") {          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { token =>            new SqlRoute("sql", token)(new JDBCRepo)              .route            // ~ ...          }        }      val (port, host) = (50081,"192.168.11.189")      val bindingFuture = Http().bindAndHandle(route,host,port)      println(s"Server running at $host $port. Press any key to exit ...")      scala.io.StdIn.readLine()      bindingFuture.flatMap(_.unbind())      .onComplete(_ => httpSys.terminate())    }

服務入口在http://mydemo.com/api/sql,服務包括get,post,put三類,參考這個SqlRoute:

package com.datatech.rest.sql  import akka.http.scaladsl.server.Directives  import akka.stream.ActorMaterializer  import akka.http.scaladsl.model._  import akka.actor.ActorSystem  import com.datatech.rest.sql.Repo.JDBCRepo  import akka.http.scaladsl.common._  import spray.json.DefaultJsonProtocol  import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport    trait JsFormats extends SprayJsonSupport with DefaultJsonProtocol  object JsConverters extends JsFormats {    import SqlModels._    implicit val brandFormat = jsonFormat2(Brand)    implicit val customerFormat = jsonFormat6(Customer)  }    object SqlRoute {    import JsConverters._    implicit val jsonStreamingSupport = EntityStreamingSupport.json()      .withParallelMarshalling(parallelism = 8, unordered = false)      class SqlRoute(val pathName: String, val jwt: String)(repo: JDBCRepo)(    implicit  sys: ActorSystem, mat: ActorMaterializer) extends Directives with JsonConverter {      val route = pathPrefix(pathName) {        path(Segment / Remaining) { case (db, tbl) =>          (get & parameter('sqltext)) { sql => {            val rsc = new RSConverter            val rows = repo.query[Map[String,Any]](db, sql, rsc.resultSet2Map)            complete(rows.map(m => toJson(m)))          }          } ~ (post & parameter('sqltext)) { sql =>                entity(as[String]){ json =>                  repo.batchInsert(db,tbl,sql,json)                  complete(StatusCodes.OK)                }          } ~ put {            entity(as[Seq[String]]) { sqls =>              repo.update(db, sqls)              complete(StatusCodes.OK)            }          }        }      }    }  }

jdbc-engine的特點是可以用字元類型的sql語句來操作。所以我們可以通過傳遞字元串型的sql語句來實現服務調用,使用門檻低,方便通用。restapi-sql提供的是對伺服器端sqlserver的普通操作,包括讀get,寫入post,更改put。這些sqlserver操作部分是在JDBCRepo里的:

package com.datatech.rest.sql  import com.datatech.sdp.jdbc.engine.JDBCEngine._  import com.datatech.sdp.jdbc.engine.{JDBCQueryContext, JDBCUpdateContext}  import scalikejdbc._  import akka.stream.ActorMaterializer  import com.datatech.sdp.result.DBOResult.DBOResult  import akka.stream.scaladsl._  import scala.concurrent._  import SqlModels._    object Repo {      class JDBCRepo(implicit ec: ExecutionContextExecutor, mat: ActorMaterializer) {      def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = {        //construct the context        val ctx = JDBCQueryContext(          dbName = Symbol(db),          statement = sqlText        )        jdbcAkkaStream(ctx,toRow)      }        def query(db: String, tbl: String, sqlText: String) = {        //construct the context        val ctx = JDBCQueryContext(          dbName = Symbol(db),          statement = sqlText        )        jdbcQueryResult[Vector,RS](ctx,getConverter(tbl)).toFuture[Vector[RS]]      }        def update(db: String, sqlTexts: Seq[String]): DBOResult[Seq[Long]] = {        val ctx = JDBCUpdateContext(          dbName = Symbol(db),          statements = sqlTexts        )        jdbcTxUpdates(ctx)      }      def bulkInsert[P](db: String, sqlText: String, prepParams: P => Seq[Any], params: Source[P,_]) = {        val insertAction = JDBCActionStream(          dbName = Symbol(db),          parallelism = 4,          processInOrder = false,          statement = sqlText,          prepareParams = prepParams        )        params.via(insertAction.performOnRow).to(Sink.ignore).run()      }      def batchInsert(db: String, tbl: String, sqlText: String, jsonParams: String):DBOResult[Seq[Long]] = {        val ctx = JDBCUpdateContext(          dbName = Symbol(db),          statements = Seq(sqlText),          batch = true,          parameters = getSeqParams(jsonParams,sqlText)        )        jdbcBatchUpdate[Seq](ctx)      }    }    import monix.execution.Scheduler.Implicits.global    implicit class DBResultToFuture(dbr: DBOResult[_]){      def toFuture[R] = {        dbr.value.value.runToFuture.map {          eor =>            eor match {              case Right(or) => or match {                case Some(r) => r.asInstanceOf[R]                case None => throw new RuntimeException("Operation produced None result!")              }              case Left(err) => throw new RuntimeException(err)            }        }      }    }  }

讀query部分即 def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = {…} 這個函數返回Source[R,Any],下面我們好好談談這個R:R是讀的結果,通常是某個類或model,比如讀取Person記錄返回一組Person類的實例。這裡有一種強類型的感覺。一開始我也是隨大流堅持建model後用toJson[E],fromJson[E]這樣做線上數據轉換。現在的問題是restapi-sql是一項公共服務,使用者知道sqlserver上有些什麼表,然後希望通過sql語句來從這些表裡讀取數據。這些sql語句可能超出表的界限如sql join, union等,如果我們堅持每個返回結果都必須有個對應的model,那麼顯然就會犧牲這個服務的通用性。實際上,http線上數據交換本身就不可能是強類型的,因為經過了json轉換。對於json轉換來說,只要求欄位名稱、欄位類型對稱就行了。至於從什麼類型轉換成了另一個什麼類型都沒問題。所以,欄位名+欄位值的表現形式不就是Map[K,V]嗎,我們就用Map[K,V]作為萬能model就行了,沒人知道。也就是說用戶方通過sql語句指定返回的欄位名稱,它們可能是任何類型Any,具體類型自然會由資料庫補上。服務方從資料庫讀取結果ResultSet後轉成Map[K,V]然後再轉成json返回給用戶,用戶可以用Map[String,Any]資訊產生任何類型,這就是自主。好,就來看看如何將ResultSet轉成Map[String,Any]:

package com.datatech.rest.sql  import scalikejdbc._  import java.sql.ResultSetMetaData  class RSConverter {    import RSConverterUtil._    var rsMeta: ResultSetMetaData = _    var columnCount: Int = 0    var rsFields: List[(String,String)] = List[(String,String)]()      def getFieldsInfo:List[(String,String)] =      ( 1 until columnCount).foldLeft(List[(String,String)]()) {      case (cons,i) =>        (rsMeta.getColumnLabel(i) -> rsMeta.getColumnTypeName(i)) :: cons    }    def resultSet2Map(rs: WrappedResultSet): Map[String,Any] = {      if(columnCount == 0) {        rsMeta =  rs.underlying.getMetaData        columnCount = rsMeta.getColumnCount        rsFields = getFieldsInfo      }      rsFields.foldLeft(Map[String,Any]()) {        case (m,(n,t)) =>          m + (n -> rsFieldValue(n,t,rs))      }    }  }  object RSConverterUtil {    import scala.collection.immutable.TreeMap    def map2Params(stm: String, m: Map[String,Any]): Seq[Any] = {      val sortedParams = m.foldLeft(TreeMap[Int,Any]()) {        case (t,(k,v)) => t + (stm.indexOfSlice(k) -> v)      }      sortedParams.map(_._2).toSeq    }    def rsFieldValue(fldname: String, fldType: String, rs: WrappedResultSet): Any = fldType match {      case "LONGVARCHAR" => rs.string(fldname)      case "VARCHAR" => rs.string(fldname)      case "CHAR" => rs.string(fldname)      case "BIT" => rs.boolean(fldname)      case "TIME" => rs.time(fldname)      case "TIMESTAMP" => rs.timestamp(fldname)      case "ARRAY" => rs.array(fldname)      case "NUMERIC" => rs.bigDecimal(fldname)      case "BLOB" => rs.blob(fldname)      case "TINYINT" => rs.byte(fldname)      case "VARBINARY" => rs.bytes(fldname)      case "BINARY" => rs.bytes(fldname)      case "CLOB" => rs.clob(fldname)      case "DATE" => rs.date(fldname)      case "DOUBLE" => rs.double(fldname)      case "REAL" => rs.float(fldname)      case "FLOAT" => rs.float(fldname)      case "INTEGER" => rs.int(fldname)      case "SMALLINT" => rs.int(fldname)      case "Option[Int]" => rs.intOpt(fldname)      case "BIGINT" => rs.long(fldname)    }  }

這段主要功能是將JDBC的ResultSet轉換成Map[String,Any]。在前面討論的restapi-mongo我們可以進行Document到Map[String,Any]的轉換以實現同樣的目的。

下面是個調用query服務的例子:

    val getAllRequest = HttpRequest(        HttpMethods.GET,        uri = "http://192.168.11.189:50081/api/sql/termtxns/brand?sqltext=SELECT%20*%20FROM%20BRAND",      ).addHeader(authentication)        (for {        response <- Http().singleRequest(getAllRequest)        json <- Unmarshal(response.entity).to[String]      } yield message).andThen {        case Success(msg) => println(s"Received json collection: $json")        case Failure(err) => println(s"Error: ${err.getMessage}")      }

特點是我只需要提供sql語句,服務就會返回一個json數組,然後我怎麼把json轉成任何類型就隨我高興了。

再看看post服務:在這裡希望實現一種批次型插入表的功能,比如從一個數據表裡把數據搬到另外一個表。一般來講在jdbc操作里首先得提供一個模版,如:insert into person(fullname,code) values(?,?),然後通過提供一組參數值來實現批次插入。當然,為安全起見,我們還是需要確定正確的參數位置,這個可以從sql語句里獲取:

  def map2Params(stm: String, m: Map[String,Any]): Seq[Any] = {      val sortedParams = m.foldLeft(TreeMap[Int,Any]()) {        case (t,(k,v)) => t + (stm.toUpperCase.indexOfSlice(k.toUpperCase) -> v)      }      sortedParams.map(_._2).toSeq    }      def getSeqParams(json: String, sql: String): Seq[Seq[Any]] = {      val seqOfjson = fromJson[Seq[String]](json)      val prs = seqOfjson.map(fromJson[Map[String,Any]])      prs.map(RSConverterUtil.map2Params(sql,_))    }

下面是個批次插入的示範程式碼:

    val encodedSelect = URLEncode.encode("select id as code, name as fullname from members")      val encodedInsert = URLEncode.encode("insert into person(fullname,code) values(?,?)")      val getMembers = HttpRequest(         HttpMethods.GET,         uri = "http://192.168.0.189:50081/api/sql/h2/members?sqltext="+encodedSelect        ).addHeader(authentication)      val postRequest = HttpRequest(        HttpMethods.POST,        uri = "http://192.168.0.189:50081/api/sql/h2/person?sqltext="+encodedInsert,      ).addHeader(authentication)        (for {        _ <- update("http://192.168.0.189:50081/api/sql/h2/person",Seq(createCTX))        respMembers <- Http().singleRequest(getMembers)        message <- Unmarshal(respMembers.entity).to[String]        reqEntity <- Marshal(message).to[RequestEntity]        respInsert <- Http().singleRequest(postRequest.copy(entity = reqEntity))   //       HttpEntity(ContentTypes.`application/json`,ByteString(message))))      } yield respInsert).onComplete {        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>          println("builk insert successful!")        case Success(_) => println("builk insert failed!")        case Failure(err) => println(s"Error: ${err.getMessage}")      }

你看,我特別把參數值清單里欄位位置和insert sql里欄位先後位置顛倒了,但還是得到正確的結果。

最後是put:這是為批次型的事物處理設計的。接受一條或者多條無參數sql指令,多條指令會在一個事物中執行。具體使用方式如下:

    def update(url: String, cmds: Seq[String])(implicit token: Authorization): Future[HttpResponse] =      for {        reqEntity <- Marshal(cmds).to[RequestEntity]        response <- Http().singleRequest(HttpRequest(          method=HttpMethods.PUT,uri=url,entity=reqEntity)        .addHeader(token))      } yield response

在上面的討論里介紹了基於sqlserver的rest服務,與前面討論的restapi-mongo從原理上區別並不大,重點是實現了用戶主導的資料庫操作。