在前面几篇讨论里我们都提到过:Akka-http是一项系统集成工具库。它是以数据交换的形式进行系统集成的。所以,Akka-http的核心功能应该是数据交换的实现了:应该能通过某种公开的数据格式和传输标准比较方便的实现包括异类系统之间通过网上进行的数据交换。覆盖包括:数据编码、发送和数据接收、解析全过程。Akka-http提供了许多网上传输标准数据的概括模型以及数据类型转换方法,可以使编程人员很方便的构建网上往来的Request和Response。但是,现实中的数据交换远远不止针对request和response操作能够满足的。系统之间数据交换经常涉及文件或者数据库表类型的数据上传下载。虽然在Http标准中描述了如何通过MultiPart消息类型进行批量数据的传输,但是这个标准涉及的实现细节包括数据内容描述、数据分段方式、消息数据长度计算等等简直可以立即令人却步。Akka-http是基于Akka-stream开发的:不但它的工作流程可以用Akka-stream来表达,它还支持stream化的数据传输。我们知道:Akka-stream提供了功能强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。简单来说:Akka-http的消息数据内容HttpEntity可以支持理论上无限长度的data-stream。最可贵的是:这个Source是个Reactive-Stream-Source,具备了back-pressure机制,可以有效应付数据交换参与两方Reactive端点不同的数据传输速率。

Akka-http的stream类型数据内容是以Source[T,_]类型表示的。首先,Akka-stream通过FileIO对象提供了足够多的file-io操作函数,其中有个fromPath函数可以用某个文件内容数据构建一个Source类型:

/**
* Creates a Source from a files contents.
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[akka.stream.ActorAttributes]].
*
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*
* @param f the file path to read from
* @param chunkSize the size of each read operation, defaults to 8192
*/
def fromPath(f: Path, chunkSize: Int = ): Source[ByteString, Future[IOResult]] =
fromPath(f, chunkSize, startPosition = )

这个函数构建了Source[ByteString,Future[IOResult]],我们需要把ByteString转化成MessageEntity。首先需要在implicit-scope内提供Marshaller[ByteString,MessageEntity]类型的隐式实例:

trait JsonCodec extends Json4sSupport {
import org.json4s.DefaultFormats
import org.json4s.ext.JodaTimeSerializers
implicit val serilizer = jackson.Serialization
implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec object ServerStreaming extends App {
import JsConverters._
...

我们还需要Json-Streaming支持:

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
.withParallelMarshalling(parallelism = , unordered = false)

FileIO是blocking操作,我们还可以选用独立的线程供blocking操作使用:

   FileIO.fromPath(file, )
.withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))

现在我们可以从在server上用一个文件构建Source然后再转成Response:

  val route =
get {
path("files"/Remaining) { name =>
complete(loadFile(name))
}
}
def loadFile(path: String) = {
// implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val file = Paths.get("/Users/tiger/"+path)
FileIO.fromPath(file, )
.withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
.map(_.utf8String)
}

同样,我们也可以把数据库表内数据转成Akka-Stream-Source,然后再实现到MessageEntity的转换。转换过程包括用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

object SlickDAO {
import slick.jdbc.H2Profile.api._
val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
val db = dbConfig.db case class CountyModel(id: Int, name: String)
case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
def name = column[String]("NAME",O.Length())
def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
}
val CountyQuery = TableQuery[CountyTable] def loadTable(filter: String) = {
// implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
val publisher = db.stream(qry.result)
Source.fromPublisher(publisher = publisher)
.withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
}
}

然后进行到MessageEntity的转换:

  val route =
get {
path("files"/Remaining) { name =>
complete(loadFile(name))
} ~
path("tables"/Segment) { t =>
complete(SlickDAO.loadTable(t))
}
}

下面是本次示范的完整源代码:

import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson object SlickDAO {
import slick.jdbc.H2Profile.api._
val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
val db = dbConfig.db case class CountyModel(id: Int, name: String)
case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
def name = column[String]("NAME",O.Length())
def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
}
val CountyQuery = TableQuery[CountyTable] def loadTable(filter: String) = {
// implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
val publisher = db.stream(qry.result)
Source.fromPublisher(publisher = publisher)
.withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
}
} trait JsonCodec extends Json4sSupport {
import org.json4s.DefaultFormats
import org.json4s.ext.JodaTimeSerializers
implicit val serilizer = jackson.Serialization
implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec object ServerStreaming extends App {
import JsConverters._ implicit val httpSys = ActorSystem("httpSystem")
implicit val httpMat = ActorMaterializer()
implicit val httpEC = httpSys.dispatcher implicit val jsonStreamingSupport = EntityStreamingSupport.json()
.withParallelMarshalling(parallelism = , unordered = false) val (port, host) = (,"localhost") val route =
get {
path("files"/Remaining) { name =>
complete(loadFile(name))
} ~
path("tables"/Segment) { t =>
complete(SlickDAO.loadTable(t))
}
} def loadFile(path: String) = {
// implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
val file = Paths.get("/Users/tiger/"+path)
FileIO.fromPath(file, )
.withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
.map(_.utf8String)
} val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind())
.onComplete(_ => httpSys.terminate()) }

最新文章

  1. python征程1.3(初识python)
  2. JSON,Bean,XML,List,Map
  3. 原创:SAP LVC ALV编辑小技巧
  4. jquery easy ui 1.3.4 事件与方法的使用(3)
  5. HTML中的一些常见的事件句柄
  6. android学习笔记47——读写SD卡上的文件
  7. UITableview刷新某一个cell或section
  8. oracle学习 十二 使用.net程序调用带返回值的存储过程(持续更新)
  9. 69 Spring Interview Questions and Answers – The ULTIMATE List--reference
  10. Nginx高性能服务器安装、配置、运维 (1) —— Nginx简介
  11. C++ Primer 5th 第16章 模板与泛型编程
  12. Eclipse 安装mybatis的编辑插件
  13. 基于Azure blob storage T级别HBase表恢复
  14. wife信号如何传播
  15. 分享一个完整的Mybatis分页解决方案
  16. Java开发中碰到的Map的坑
  17. MongoDB3.2.22快速入门与使用【未完待续】
  18. JAVA学习笔记(1)—— eclipse自动补全和主题及字体配置
  19. 游戏编程算法与技巧 Game Programming Algorithms and Techniques (Sanjay Madhav 著)
  20. ML.NET 示例:二元分类之垃圾短信检测

热门文章

  1. JSP内置对象的实验报告,页面登陆设计
  2. Hadoop 一: NCDC 数据准备
  3. 《深入浅出设计模式》读书笔记 C#版(第一章)
  4. 线段树专题—ZOJ1610 Count the Colors(涂区间,直接tag标记)
  5. hadoop(二)搭建伪分布式集群
  6. SQLServer中重建聚集索引之后会影响到非聚集索引的索引碎片吗
  7. C#仪器数据文件解析-XPS文件
  8. ch1-vuejs基础入门(hw v-bind v-if v-for v-on v-model 应用组件简介 小案例)
  9. HDU3844Tour (好题)
  10. Oracle的常用命令之备份和恢复数据库