restapi(4)- rest-mongo : MongoDB資料庫前端的httpserver

  • 2019 年 10 月 3 日
  • 筆記

   完成了一套標準的rest風格資料庫CRUD操作httpserver後發現有許多不足。主要是為了追求“通用”兩個字,想把所有服務介面做的更“范generic”些,結果反而限制了目標資料庫的特點,最終產生了一套功能弱小的玩具。比如說吧:標準rest風格getbyId需要所有的數據表都具備id這個欄位,有點傻。然後get返回的結果集又沒有什麼靈活的控制方法如返回數量、欄位、排序等。特別對MongoDB這樣的在查詢操作方面接近關係式資料庫的分散式資料庫:上篇提到過,它的query能力強大,條件組合靈活,如果不能在網路服務api中體現出來就太可惜了。所以,這篇博文會討論一套專門針對MongoDB的rest-server。我想達到的目的是:後台資料庫是MongoDB,通過httpserver提供對MongoDB的CRUD操作,客戶端通過http調用CRUD服務。後台開發對每一個資料庫表單使用統一的標準增添一套新的CRUD服務。希望如此能夠提高開發效率,減少程式碼出錯機會。

MongoDB是一種文件類型資料庫,數據格式更加多樣化。在這次示範里希望能把MongoDB有特點的數據類型以及它們的處理方法都介紹了,包括:日期類型,二進位類型blob(圖片)等。順便提一下:普通大型文本文件也可以用二進位blob方式存入MongoDB,因為文件在http傳輸過程中必須以byte方式進行,所以後台httpserver接收的文件格式是一串byte,不用任何格式轉換就可以直接存入MongoDB blob欄位。客戶端從後台下載時就需要把bytes轉換成UTF8字元就可以恢復文件內容了。

首先,我們先從Model開始,在scala里用case class來表示。Model是MongoDB Document的對應。在scala編程里我們是用case class 當作Document來操作的。我們設計的Model都會繼承一個ModelBase trait:

trait ModelBase[E] {    def to: E  }        case class Person(                     userid: String = "",                     name: String = "",                     age: Option[Int] = None,                     dob: Option[MGODate] = None,   //生日                     address: Option[String] = None                     ) extends ModelBase[Document] {      import org.mongodb.scala.bson._        override def to: Document = {        var doc = Document(        "userid" -> this.userid,        "name" -> this.name)            if (this.age != None)          doc = doc + ("age" -> this.age.get)          if (this.dob != None)          doc = doc + ("dob" -> this.dob.get)          if (this.address != None)          doc = doc + ("address" -> this.address.getOrElse(""))          doc      }      }    object Person {      val fromDocument: Document => Person = doc => {        val keyset = doc.keySet        Person(          userid = doc.getString("userid"),          name = doc.getString("name"),          age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],            dob =  {if (keyset.contains("dob"))            Some(doc.getDate("dob"))          else None },            address =  mgoGetStringOrNone(doc,"address")        )      }    }

在上面例子里Person對應MongoDB里一個Document。除了注意對應類型屬性與表欄位類型外,還提供了to,fromDecument兩個轉換函數。其中to函數是繼承ModelBase的,代表所有MongoDB Model都必須具備to這個函數。這點很重要,因為在從json構建成Model時,如果屬於ModelBase則肯定可以調用一個to函數:

 class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(  ...      post {          entity(as[String]) { json =>            val extractedEntity: M = fromJson[M](json)            val doc: Document = extractedEntity.to            val futmsg = repository.insert(doc).value.value.runToFuture.map {              eoc =>                eoc match {                  case Right(oc) => oc match {                    case Some(c) => c.toString()                    case None => "insert may not complete!"                  }                  case Left(err) => err.getMessage                }            }

注意這個extractedEntity:我們現在還不能確定它的具體類型,是Person,Animal,Machine? 但我們確定它是M類型,而M<:ModalBase[Document],所以M是MongoDB Model。可以調用extractedEntity.to獲取一個Document。

仔細看,Person里並不包括blob類型欄位。因為到現在我還沒有想到辦法在一個httprequest里把多個欄位和圖片一次性發出來,必須分兩個request才能完成一個Document的上傳。httpserver收到兩個requests後還要進行requests的匹配對應管理,十分的複雜。所以含blob類型的Document只能把blob分拆到另一個Document里,然後用這個Document唯一一個id欄位來鏈接:

  case class Photo (                       id: String,                       photo: Option[MGOBlob]                     ) extends ModelBase[Document] {      override def to: Document = {        var doc = Document("id" -> this.id)        if (photo != None)          doc = doc + ("photo" -> this.photo)        doc      }    }      object Photo {      def fromDocument: Document => Photo = doc => {        val keyset = doc.keySet        Photo(          id = doc.getString("id"),          photo = mgoGetBlobOrNone(doc, "photo")        )      }    }

從另一個角度來講,把blob和正常欄位分開來存儲也有一定的優勢,最多也就是需要兩次query罷了。

第二部分是repository:資料庫操作函數:

   class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {      def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {        var res = Seq[ResultOptions]()        next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}        sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}        fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}        top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}          val ctxFind = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_QUERY)          .setCommand(Find(andThen = res))        mgoQuery[Seq[R]](ctxFind,converter)      }         def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {         var res = Seq[ResultOptions]()         next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}         sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}         fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}         top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}         val ctxFind = MGOContext(dbName = db,collName=coll)           .setActionType(MGO_ACTION_TYPE.MGO_QUERY)           .setCommand(Find(filter = Some(filtr),andThen = res))         mgoQuery[Seq[R]](ctxFind,converter)      }      def getOneDocument(filtr: Bson): DBOResult[Document] = {            val ctxFind = MGOContext(dbName = db,collName=coll)           .setActionType(MGO_ACTION_TYPE.MGO_QUERY)           .setCommand(Find(filter = Some(filtr),firstOnly = true))         mgoQuery[Document](ctxFind,converter)      }        def insert(doc: Document): DBOResult[Completed] = {        val ctxInsert = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)          .setCommand(Insert(Seq(doc)))        mgoUpdate[Completed](ctxInsert)      }        def delete(filter: Bson): DBOResult[DeleteResult] = {        val ctxDelete = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)          .setCommand(Delete(filter))        mgoUpdate[DeleteResult](ctxDelete)      }        def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {        val ctxUpdate = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)          .setCommand(Update(filter,update,None,!many))        mgoUpdate[UpdateResult](ctxUpdate)      }        def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {         val ctxUpdate = MGOContext(dbName = db,collName=coll)           .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)           .setCommand(Replace(filter,row))         mgoUpdate[UpdateResult](ctxUpdate)      }      }

這部分上篇博文討論過。最後是akka-http的核心部分:Route。MongoDB CRUD服務對外的api:

      (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {          (filter,fields,sort,top,next) => {          dbor = {            filter match {              case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)              case None => repository.getAll(next,sort,fields,top)            }          }          val futRows = dbor.value.value.runToFuture.map {            eolr =>              eolr match {                case Right(olr) => olr match {                  case Some(lr) => lr                  case None => Seq[M]()                }                case Left(_) => Seq[M]()              }          }          complete(futureToJson(futRows))         }        } ~ post {          entity(as[String]) { json =>            val extractedEntity: M = fromJson[M](json)            val doc: Document = extractedEntity.to            val futmsg = repository.insert(doc).value.value.runToFuture.map {              eoc =>                eoc match {                  case Right(oc) => oc match {                    case Some(c) => c.toString()                    case None => "insert may not complete!"                  }                  case Left(err) => err.getMessage                }            }              complete(futmsg)          }        } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>          val bson = Document(filter)          if (set == None) {            entity(as[String]) { json =>              val extractedEntity: M = fromJson[M](json)              val doc: Document = extractedEntity.to              val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {                eoc =>                  eoc match {                    case Right(oc) => oc match {                      case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."                      case None => "update may not complete!"                    }                    case Left(err) => err.getMessage                  }              }              complete(futureToJson(futmsg))            }          } else {            set match {              case Some(u) =>                val ubson = Document(u)                dbou = repository.update(bson, ubson, many.getOrElse(true))              case None =>                dbou = Left(new IllegalArgumentException("missing set statement for update!"))            }            val futmsg = dbou.value.value.runToFuture.map {              eoc =>                eoc match {                  case Right(oc) => oc match {                    case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."                    case None => "update may not complete!"                  }                  case Left(err) => err.getMessage                }            }            complete(futureToJson(futmsg))          }        } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>          val bson = Document(filter)          val futmsg = repository.delete(bson).value.value.runToFuture.map {            eoc =>              eoc match {                case Right(oc) => oc match {                  case Some(d) => s"${d.getDeletedCount} rows deleted."                  case None => "delete may not complete!"                }                case Left(err) => err.getMessage              }          }          complete(futureToJson(futmsg))        }      }

 與上篇最大的區別就是這次的Route支援MongoDB特性的query string,bson類型的參數。如:

http://192.168.0.189:50081/private/crud/person  http://192.168.0.189:50081/private/crud/person?filter={"userid":"c001"}  http://192.168.0.189:50081/private/crud/person?sort={"userid":-1}  http://192.168.0.189:50081/private/crud/person?filter={"userid":{$gt:"c000"}}&sort={"userid":-1}&top=3

可惜的是bson表達式中有些字元是url禁止的,所以必須預先處理一下。可以用公網的UrlEncoder在線轉換:

https://www.url-encoder.com   {“userid”:”c001″} -> %7B%22userid%22%3A%22c001%22%7D

在程式里可以用軟體工具:”com.github.tasubo” % “jurl-tools” % “0.6”  URLEncode.encode(xyz)

 val sort =        """          |{userid:-1}        """.stripMargin        val getAllRequest = HttpRequest(        HttpMethods.GET,        uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),      ).addHeader(authentication)

blob服務的api Route:

      pathPrefix("blob") {          (get & path(Remaining)) { id =>            val filtr = equal("id", id)            val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {              eodoc =>                eodoc match {                  case Right(odoc) => odoc match {                    case Some(doc) =>                      if (doc == null) None                      else mgoGetBlobOrNone(doc, "photo")                    case None => None                  }                  case Left(_) => None                }            }            onComplete(futOptPic) {              case Success(optBlob) => optBlob match {                case Some(blob) =>                  withoutSizeLimit {                    encodeResponseWith(Gzip) {                      complete(                        HttpEntity(                          ContentTypes.`application/octet-stream`,                          ByteArrayToSource(blob.getData))                      )                    }                  }                case None => complete(StatusCodes.NotFound)              }              case Failure(err) => complete(err)            }          } ~          (post &  parameter('id)) { id =>            withoutSizeLimit {              decodeRequest {                extractDataBytes { bytes =>                  val fut = bytes.runFold(ByteString()) { case (hd, bs) =>                    hd ++ bs                  }                  onComplete(fut) {                    case Success(b) =>                      val doc = Document("id" -> id, "photo" -> b.toArray)                      val futmsg = repository.insert(doc).value.value.runToFuture.map {                        eoc =>                          eoc match {                            case Right(oc) => oc match {                              case Some(c) => c.toString()                              case None => "insert may not complete!"                            }                            case Left(err) => err.getMessage                          }                      }                      complete(futmsg)                    case Failure(err) => complete(err)                  }                }              }            }          }        } 

注意:MongoRoute[M]是個范類型。我希望對任何Model的Route只需要指定M即可,如:

  implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))    implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)    ...      pathPrefix("public") {          (pathPrefix("crud")) {            new MongoRoute[Person]("person")(personDao)              .route ~              new MongoRoute[Photo]("photo")(picDao)                .route          }        }

是否省力多了?但是,回到原來問題:blob類型在整個移動過程中都不需要進行格式轉換。所以id欄位名稱是指定的,這點在設計表結構時要注意。

如何測試一個httpserver還是比較頭痛的。用瀏覽器只能測試GET,其它POST,PUT,DELETE應該怎麼測試?其實可以用curl:

curl -i -X GET http://rest-api.io/items  curl -i -X GET http://rest-api.io/items/5069b47aa892630aae059584  curl -i -X DELETE http://rest-api.io/items/5069b47aa892630aae059584  curl -i -X POST -H 'Content-Type: application/json' -d '{"name": "New item", "year": "2009"}' http://rest-api.io/items  curl -i -X PUT -H 'Content-Type: application/json' -d '{"name": "Updated item", "year": "2010"}' http://rest-api.io/items/5069b47aa892630aae059584   

下面寫兩個客戶端分別測試crud和blob:

TestCrudClient.scala

import akka.actor._  import akka.http.scaladsl.model.headers._    import scala.concurrent._  import scala.concurrent.duration._  import akka.http.scaladsl.Http  import akka.http.scaladsl.marshalling._  import akka.http.scaladsl.model._  import akka.stream.ActorMaterializer  import com.github.tasubo.jurl.URLEncode  import com.datatech.rest.mongo.MongoModels.Person  import de.heikoseeberger.akkahttpjson4s.Json4sSupport  import org.json4s.jackson  import com.datatech.sdp.mongo.engine.MGOClasses._    trait JsonCodec extends Json4sSupport {    import org.json4s.DefaultFormats    import org.json4s.ext.JodaTimeSerializers    implicit val serilizer = jackson.Serialization    implicit val formats = DefaultFormats ++ JodaTimeSerializers.all  }  object JsConverters extends JsonCodec    object TestCrudClient {      type UserInfo = Map[String,Any]    def main(args: Array[String]): Unit = {      import JsConverters._        implicit val system = ActorSystem()      implicit val materializer = ActorMaterializer()      // needed for the future flatMap/onComplete in the end      implicit val executionContext = system.dispatcher        val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))      val authRequest = HttpRequest(        HttpMethods.POST,        uri = "http://192.168.0.189:50081/auth",        headers = List(authorization)      )        val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)        val respToken = for {        resp <- futToken        jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}      } yield jstr        val jstr =  Await.result[String](respToken,2 seconds)      println(jstr)      scala.io.StdIn.readLine()        val authentication = headers.Authorization(OAuth2BearerToken(jstr))        val sort =        """          |{userid:-1}        """.stripMargin        val getAllRequest = HttpRequest(        HttpMethods.GET,        uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),      ).addHeader(authentication)      val futGetAll: Future[HttpResponse] = Http().singleRequest(getAllRequest)      println(Await.result(futGetAll,2 seconds))      scala.io.StdIn.readLine()        var bf =        """          |{"userid":"c888"}        """.stripMargin        println(URLEncode.encode(bf))        val delRequest = HttpRequest(        HttpMethods.DELETE,        uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)      ).addHeader(authentication)      val futDel: Future[HttpResponse] = Http().singleRequest(delRequest)      println(Await.result(futDel,2 seconds))      scala.io.StdIn.readLine()         bf =        """          |{"userid":"c001"}        """.stripMargin        val getRequest = HttpRequest(        HttpMethods.GET,        uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf),      ).addHeader(authentication)      val futGet: Future[HttpResponse] = Http().singleRequest(getRequest)      println(Await.result(futGet,2 seconds))      scala.io.StdIn.readLine()        val tiger = Person("c001","tiger chan",Some(56))      val john = Person("c002", "johnny dep", Some(60))      val peter = Person("c003", "pete brad", Some(58))      val susan = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )      val ns = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )        val saveRequest = HttpRequest(        HttpMethods.POST,        uri = "http://192.168.0.189:50081/public/crud/person"      ).addHeader(authentication)      val futPost: Future[HttpResponse] =        for {          reqEntity <- Marshal(peter).to[RequestEntity]          response <- Http().singleRequest(saveRequest.copy(entity=reqEntity))        } yield response        println(Await.result(futPost,2 seconds))      scala.io.StdIn.readLine()        var set =        """          | {$set:          |   {          |    name:"tiger the king",          |    age:18          |   }          | }        """.stripMargin        val updateRequest = HttpRequest(        HttpMethods.PUT,        uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(             bf)+"&set="+URLEncode.encode(set)+"&many=true"      ).addHeader(authentication)        val futUpdate: Future[HttpResponse] = Http().singleRequest(updateRequest)      println(Await.result(futUpdate,2 seconds))      scala.io.StdIn.readLine()        val repRequest = HttpRequest(        HttpMethods.PUT,        uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)      ).addHeader(authentication)      val futReplace: Future[HttpResponse] =        for {          reqEntity <- Marshal(susan).to[RequestEntity]          response <- Http().singleRequest(updateRequest.copy(entity=reqEntity))        } yield response        println(Await.result(futReplace,2 seconds))      scala.io.StdIn.readLine()        system.terminate()      }    }

TestFileClient.scala

import akka.stream._  import java.nio.file._  import java.io._  import akka.http.scaladsl.model.headers._  import scala.concurrent._  import com.datatech.rest.mongo.FileStreaming._  import scala.concurrent.duration._  import akka.actor.ActorSystem  import akka.http.scaladsl.marshalling.Marshal  import akka.http.scaladsl.model._  import akka.http.scaladsl.Http  import akka.stream.scaladsl.{FileIO, Source}  import scala.util._    case class FileUtil(implicit sys: ActorSystem) {    import sys.dispatcher    implicit val mat = ActorMaterializer()    def createEntity(file: File): RequestEntity = {      require(file.exists())      val formData =        Multipart.FormData(          Source.single(            Multipart.FormData.BodyPart(              "test",              HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance              Map("filename" -> file.getName))))      Await.result(Marshal(formData).to[RequestEntity], 3 seconds)    }      def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {      implicit val mat = ActorMaterializer()      import sys.dispatcher      val futResp = Http(sys).singleRequest(        //   Gzip.encodeMessage(        request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))        //   )      )      futResp        .andThen {          case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>            entity.dataBytes.map(_.utf8String).runForeach(println)          case Success(r@HttpResponse(code, _, _, _)) =>            println(s"Upload request failed, response code: $code")            r.discardEntityBytes()          case Success(_) => println("Unable to Upload file!")          case Failure(err) => println(s"Upload failed: ${err.getMessage}")        }    }      def downloadFileTo(request: HttpRequest, destPath: String) = {      //  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))      val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))      futResp        .andThen {          case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>            entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))              .onComplete { case _ => println(s"Download file saved to: $destPath") }          case Success(r@HttpResponse(code, _, _, _)) =>            println(s"Download request failed, response code: $code")            r.discardEntityBytes()          case Success(_) => println("Unable to download file!")          case Failure(err) => println(s"Download failed: ${err.getMessage}")        }      }    }    object TestFileClient  {    type UserInfo = Map[String,Any]    def main(args: Array[String]): Unit = {      implicit val system = ActorSystem()      implicit val materializer = ActorMaterializer()      // needed for the future flatMap/onComplete in the end      implicit val executionContext = system.dispatcher        val helloRequest = HttpRequest(uri = "http://192.168.0.189:50081/")        val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))      val authRequest = HttpRequest(        HttpMethods.POST,        uri = "http://192.168.0.189:50081/auth",        headers = List(authorization)      )        val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)        val respToken = for {        resp <- futToken        jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}      } yield jstr        val jstr =  Await.result[String](respToken,2 seconds)      println(jstr)        scala.io.StdIn.readLine()        val authentication = headers.Authorization(OAuth2BearerToken(jstr))        val entity = HttpEntity(        ContentTypes.`application/octet-stream`,        fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)      )      //      val chunked = HttpEntity.Chunked.fromData(        ContentTypes.`application/octet-stream`,        fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)      )        val uploadRequest = HttpRequest(        HttpMethods.POST,  //      uri = "http://192.168.0.189:50081/private/file?filename=tiger.jpg",        uri = "http://192.168.0.189:50081/public/crud/photo/blob?id=tiger.jpg",      ).addHeader(authentication)        //upload file      Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)      //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)          val dlRequest = HttpRequest(        HttpMethods.GET,  //      uri = "http://192.168.0.189:50081/api/file/mypic.jpg",        uri = "http://192.168.0.189:50081/public/crud/photo/blob/tiger.jpg",      ).addHeader(authentication)        FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")        scala.io.StdIn.readLine()      system.terminate()    }    }

下面是本次示範中的源程式碼:

build.sbt

name := "rest-mongo"    version := "0.1"    scalaVersion := "2.12.8"    scalacOptions += "-Ypartial-unification"  val akkaVersion = "2.5.23"  val akkaHttpVersion = "10.1.8"    libraryDependencies ++= Seq(    "com.typesafe.akka" %% "akka-http"   % "10.1.8",    "com.typesafe.akka" %% "akka-stream" % "2.5.23",    "com.pauldijou" %% "jwt-core" % "3.0.1",    "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",    "org.json4s" %% "json4s-native" % "3.6.1",    "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",    "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",    "org.slf4j" % "slf4j-simple" % "1.7.25",    "org.json4s" %% "json4s-jackson" % "3.6.7",    "org.json4s" %% "json4s-ext" % "3.6.7",      // for scalikejdbc    "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",    "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",    "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",    "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",    "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",    "com.h2database"  %  "h2" % "1.4.199",    "com.zaxxer" % "HikariCP" % "2.7.4",    "com.jolbox" % "bonecp" % "0.8.0.RELEASE",    "com.typesafe.slick" %% "slick" % "3.3.2",    //for cassandra 3.6.0    "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",    "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",    "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",    //for mongodb 4.0    "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",    "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",    "ch.qos.logback"  %  "logback-classic"   % "1.2.3",    "io.monix" %% "monix" % "3.0.0-RC3",    "org.typelevel" %% "cats-core" % "2.0.0-M4",    "com.github.tasubo" % "jurl-tools" % "0.6"  )

MongoHttpServer.scala

package com.datatech.rest.mongo    import akka.actor._  import akka.stream._  import akka.http.scaladsl.Http  import akka.http.scaladsl.server.Directives._  import pdi.jwt._  import AuthBase._  import MockUserAuthService._  import org.mongodb.scala._    import scala.collection.JavaConverters._  import MongoModels._  import MongoRepo._  import MongoRoute._      object MongoHttpServer extends App {        implicit val httpSys = ActorSystem("httpSystem")    implicit val httpMat = ActorMaterializer()    implicit val httpEC = httpSys.dispatcher      val settings: MongoClientSettings = MongoClientSettings.builder()      .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))      .build()    implicit val client: MongoClient = MongoClient(settings)    implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))    implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)      implicit val authenticator = new AuthBase()      .withAlgorithm(JwtAlgorithm.HS256)      .withSecretKey("OpenSesame")      .withUserFunc(getValidUser)      val route =      path("auth") {        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>          post { complete(authenticator.issueJwt(userinfo))}        }      } ~        pathPrefix("private") {          authenticateOAuth2(realm = "private", authenticator.authenticateToken) { validToken =>            FileRoute(validToken)              .route            // ~ ...          }        } ~        pathPrefix("public") {          (pathPrefix("crud")) {            new MongoRoute[Person]("person")(personDao)              .route ~              new MongoRoute[Photo]("photo")(picDao)                .route          }        }      val (port, host) = (50081,"192.168.0.189")      val bindingFuture = Http().bindAndHandle(route,host,port)      println(s"Server running at $host $port. Press any key to exit ...")      scala.io.StdIn.readLine()        bindingFuture.flatMap(_.unbind())      .onComplete(_ => httpSys.terminate())      }

ModalBase.scala

package com.datatech.rest.mongo    trait ModelBase[E] {    def to: E  }

MongoModel.scala

package com.datatech.rest.mongo  import org.mongodb.scala._  import com.datatech.sdp.mongo.engine._  import MGOClasses._    object MongoModels {      case class Person(                     userid: String = "",                     name: String = "",                     age: Option[Int] = None,                     dob: Option[MGODate] = None,                     address: Option[String] = None                     ) extends ModelBase[Document] {      import org.mongodb.scala.bson._        override def to: Document = {        var doc = Document(        "userid" -> this.userid,        "name" -> this.name)            if (this.age != None)          doc = doc + ("age" -> this.age.get)          if (this.dob != None)          doc = doc + ("dob" -> this.dob.get)          if (this.address != None)          doc = doc + ("address" -> this.address.getOrElse(""))          doc      }      }    object Person {      val fromDocument: Document => Person = doc => {        val keyset = doc.keySet        Person(          userid = doc.getString("userid"),          name = doc.getString("name"),          age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],            dob =  {if (keyset.contains("dob"))            Some(doc.getDate("dob"))          else None },            address =  mgoGetStringOrNone(doc,"address")        )      }    }      case class Photo (                       id: String,                       photo: Option[MGOBlob]                     ) extends ModelBase[Document] {      override def to: Document = {        var doc = Document("id" -> this.id)        if (photo != None)          doc = doc + ("photo" -> this.photo)        doc      }    }      object Photo {      def fromDocument: Document => Photo = doc => {        val keyset = doc.keySet        Photo(          id = doc.getString("id"),          photo = mgoGetBlobOrNone(doc, "photo")        )      }    }      }

MongoRepo.scala

package com.datatech.rest.mongo  import org.mongodb.scala._  import org.bson.conversions.Bson  import org.mongodb.scala.result._  import com.datatech.sdp.mongo.engine._  import MGOClasses._  import MGOEngine._  import MGOCommands._  import com.datatech.sdp.result.DBOResult.DBOResult  import MongoModels._    object MongoRepo {       class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {      def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {        var res = Seq[ResultOptions]()        next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}        sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}        fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}        top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}          val ctxFind = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_QUERY)          .setCommand(Find(andThen = res))        mgoQuery[Seq[R]](ctxFind,converter)      }         def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {         var res = Seq[ResultOptions]()         next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}         sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}         fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}         top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}         val ctxFind = MGOContext(dbName = db,collName=coll)           .setActionType(MGO_ACTION_TYPE.MGO_QUERY)           .setCommand(Find(filter = Some(filtr),andThen = res))         mgoQuery[Seq[R]](ctxFind,converter)      }      def getOneDocument(filtr: Bson): DBOResult[Document] = {            val ctxFind = MGOContext(dbName = db,collName=coll)           .setActionType(MGO_ACTION_TYPE.MGO_QUERY)           .setCommand(Find(filter = Some(filtr),firstOnly = true))         mgoQuery[Document](ctxFind,converter)      }        def insert(doc: Document): DBOResult[Completed] = {        val ctxInsert = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)          .setCommand(Insert(Seq(doc)))        mgoUpdate[Completed](ctxInsert)      }        def delete(filter: Bson): DBOResult[DeleteResult] = {        val ctxDelete = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)          .setCommand(Delete(filter))        mgoUpdate[DeleteResult](ctxDelete)      }        def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {        val ctxUpdate = MGOContext(dbName = db,collName=coll)          .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)          .setCommand(Update(filter,update,None,!many))        mgoUpdate[UpdateResult](ctxUpdate)      }        def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {         val ctxUpdate = MGOContext(dbName = db,collName=coll)           .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)           .setCommand(Replace(filter,row))         mgoUpdate[UpdateResult](ctxUpdate)      }      }    }

MongoRoute.scala

package com.datatech.rest.mongo  import akka.http.scaladsl.server.Directives    import scala.util._  import org.mongodb.scala._  import com.datatech.sdp.file.Streaming._  import org.mongodb.scala.result._  import MongoRepo._  import akka.stream.ActorMaterializer  import com.datatech.sdp.result.DBOResult._  import org.mongodb.scala.model.Filters._  import com.datatech.sdp.mongo.engine.MGOClasses._  import monix.execution.CancelableFuture  import akka.util._  import akka.http.scaladsl.model._  import akka.http.scaladsl.coding.Gzip  object MongoRoute {    class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(      implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter {      import monix.execution.Scheduler.Implicits.global      var dbor: DBOResult[Seq[M]] = _      var dbou: DBOResult[UpdateResult] = _      val route = pathPrefix(pathName) {        pathPrefix("blob") {          (get & path(Remaining)) { id =>            val filtr = equal("id", id)            val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {              eodoc =>                eodoc match {                  case Right(odoc) => odoc match {                    case Some(doc) =>                      if (doc == null) None                      else mgoGetBlobOrNone(doc, "photo")                    case None => None                  }                  case Left(_) => None                }            }            onComplete(futOptPic) {              case Success(optBlob) => optBlob match {                case Some(blob) =>                  withoutSizeLimit {                    encodeResponseWith(Gzip) {                      complete(                        HttpEntity(                          ContentTypes.`application/octet-stream`,                          ByteArrayToSource(blob.getData))                      )                    }                  }                case None => complete(StatusCodes.NotFound)              }              case Failure(err) => complete(err)            }          } ~          (post &  parameter('id)) { id =>            withoutSizeLimit {              decodeRequest {                extractDataBytes { bytes =>                  val fut = bytes.runFold(ByteString()) { case (hd, bs) =>                    hd ++ bs                  }                  onComplete(fut) {                    case Success(b) =>                      val doc = Document("id" -> id, "photo" -> b.toArray)                      val futmsg = repository.insert(doc).value.value.runToFuture.map {                        eoc =>                          eoc match {                            case Right(oc) => oc match {                              case Some(c) => c.toString()                              case None => "insert may not complete!"                            }                            case Left(err) => err.getMessage                          }                      }                      complete(futmsg)                    case Failure(err) => complete(err)                  }                }              }            }          }        } ~        (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {          (filter,fields,sort,top,next) => {          dbor = {            filter match {              case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)              case None => repository.getAll(next,sort,fields,top)            }          }          val futRows = dbor.value.value.runToFuture.map {            eolr =>              eolr match {                case Right(olr) => olr match {                  case Some(lr) => lr                  case None => Seq[M]()                }                case Left(_) => Seq[M]()              }          }          complete(futureToJson(futRows))         }        } ~ post {          entity(as[String]) { json =>            val extractedEntity: M = fromJson[M](json)            val doc: Document = extractedEntity.to            val futmsg = repository.insert(doc).value.value.runToFuture.map {              eoc =>                eoc match {                  case Right(oc) => oc match {                    case Some(c) => c.toString()                    case None => "insert may not complete!"                  }                  case Left(err) => err.getMessage                }            }              complete(futmsg)          }        } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>          val bson = Document(filter)          if (set == None) {            entity(as[String]) { json =>              val extractedEntity: M = fromJson[M](json)              val doc: Document = extractedEntity.to              val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {                eoc =>                  eoc match {                    case Right(oc) => oc match {                      case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."                      case None => "update may not complete!"                    }                    case Left(err) => err.getMessage                  }              }              complete(futureToJson(futmsg))            }          } else {            set match {              case Some(u) =>                val ubson = Document(u)                dbou = repository.update(bson, ubson, many.getOrElse(true))              case None =>                dbou = Left(new IllegalArgumentException("missing set statement for update!"))            }            val futmsg = dbou.value.value.runToFuture.map {              eoc =>                eoc match {                  case Right(oc) => oc match {                    case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."                    case None => "update may not complete!"                  }                  case Left(err) => err.getMessage                }            }            complete(futureToJson(futmsg))          }        } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>          val bson = Document(filter)          val futmsg = repository.delete(bson).value.value.runToFuture.map {            eoc =>              eoc match {                case Right(oc) => oc match {                  case Some(d) => s"${d.getDeletedCount} rows deleted."                  case None => "delete may not complete!"                }                case Left(err) => err.getMessage              }          }          complete(futureToJson(futmsg))        }      }    }    }