在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。

在前面sdp系列讨论里我们已经实现了Cassandra-Engine。它的运作原理还是通过某种Context把指令提交给cassandra去执行。我们先设计一个创建库表的例子。CQL语句和Cassandra-Engine程序代码如下,这是客户端部分:

  val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"

  val createCQL ="""
CREATE TABLE testdb.AQMRPT (
rowid bigint primary key,
measureid bigint,
statename text,
countyname text,
reportyear int,
value int,
created timestamp
)""" val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
def createTbl: Source[CQLResult,NotUsed] = {
log.info(s"running createTbl ...")
Source
.single(cqlddl)
.via(stub.runDDL)
}

首先,我们在CQLUpdate这个protobuf对应Context里传入两条指令dropCQL和createCQL,可以预计这会是一种批次型batch方式。然后一如既往,我们使用了streaming编程模式。在.proto文件里用DDL来对应Context和Service:

message CQLUpdate {
repeated string statements = ;
bytes parameters = ;
google.protobuf.Int32Value consistency = ;
google.protobuf.BoolValue batch = ;
} service CQLServices {
rpc runDDL(CQLUpdate) returns (CQLResult) {}
}

服务函数runDDL程序实现如下:

 override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
Flow[CQLUpdate]
.flatMapConcat { context =>
//unpack CQLUpdate and construct the context
val ctx = CQLContext(context.statements)
log.info(s"**** CQLContext => ${ctx} ***") Source
.fromFuture(cqlExecute(ctx))
.map { r => CQLResult(marshal(r)) }
}
}

这里我们调用了Cassandra-Engine的cqlExecute(ctx)函数:

  def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { var invalidBat = false
if ( ctx.batch ) {
if (ctx.parameters == Nil)
invalidBat = true
else if (ctx.parameters.size < )
invalidBat = true;
}
if (!ctx.batch || invalidBat) {
if(invalidBat)
log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.") if (ctx.statements.size == ) {
var param: Seq[Object] = Nil
if (ctx.parameters != Nil) param = ctx.parameters.head
log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
}
else {
var params: Seq[Seq[Object]] = Nil
if (ctx.parameters == Nil)
params = Seq.fill(ctx.statements.length)(Nil)
else {
if (ctx.statements.size > ctx.parameters.size) {
log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
params = ctx.parameters ++ nils }
else
params = ctx.parameters
} val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
log.info(s"cqlExecute> multi-commands: ${commands}")
/*
//using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
//therefore, make sure no command replies on prev command effect
val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
}.toList val futList = lstCmds.sequence.map(_ => true) //must map to execute
*/
/*
//using traverse to have some degree of parallelism = max(runtimes)
//therefore, make sure no command replies on prev command effect
val futList = Future.traverse(commands) { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
}.map(_ => true) Await.result(futList, 3 seconds)
Future.successful(true)
*/
// run sync directly
Future {
commands.foreach { case (stm, pars) =>
cqlExecuteSync(ctx.consistency, stm, pars)
}
true
}
}
}
else
cqlBatchUpdate(ctx)
}

特别展示了这个函数的代码是因为对于一批次多条指令可能会涉及到non-blocking和并行计算。可参考上面代码标注段落里函数式方法(cats)sequence,traverse如何实现对一串Future的运算。

下一个例子是用流方式把JDBC数据库数据并入cassandra数据库里。.proto DDL内容如下:

message ProtoDate {
int32 yyyy = ;
int32 mm = ;
int32 dd = ;
} message ProtoTime {
int32 hh = ;
int32 mm = ;
int32 ss = ;
int32 nnn = ;
} message ProtoDateTime {
ProtoDate date = ;
ProtoTime time = ;
} message AQMRPTRow {
int64 rowid = ;
string countyname = ;
string statename = ;
int64 measureid = ;
int32 reportyear = ;
int32 value = ;
ProtoDateTime created = ;
} message CQLResult {
bytes result = ;
} message CQLUpdate {
repeated string statements = ;
bytes parameters = ;
google.protobuf.Int32Value consistency = ;
google.protobuf.BoolValue batch = ;
} service CQLServices {
rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
rpc runDDL(CQLUpdate) returns (CQLResult) {}
}

下面是服务函数的实现:

 val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
row.rowid.asInstanceOf[Object],
row.measureid.asInstanceOf[Object],
row.statename,
row.countyname,
row.reportyear.asInstanceOf[Object],
row.value.asInstanceOf[Object],
CQLDateTimeNow
)
val cqlInsert ="""
|insert into testdb.AQMRPT(
| rowid,
| measureid,
| statename,
| countyname,
| reportyear,
| value,
| created)
| values(?,?,?,?,?,?,?)
""".stripMargin val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism()
.setProcessOrder(false) /*
val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
Flow[AQMRPTRow]
.via(cqlActionStream.performOnRow)
*/ val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
Flow[AQMRPTRow]
.mapAsync(cqlActionStream.parallelism){ row =>
if (IfExists(row.rowid))
Future.successful(CQLResult(marshal()))
else
cqlActionStream.perform(row).map {_ => CQLResult(marshal())}
}
} override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
Flow[AQMRPTRow]
.via(cqlActionFlow)
} private def IfExists(rowid: Long): Boolean = {
val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
val param = Seq(rowid.asInstanceOf[Object])
val toRowId: Row => Long = r => r.getLong("rowid")
val ctx = CQLQueryContext(cql,param)
val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
val fut = src.toMat(Sink.headOption)(Keep.right).run() val result = Await.result(fut, seconds) log.info(s"checking existence: ${result}")
result match {
case Some(x) => true
case None => false
}
}

在上面的代码里我们调用了Cassandra-Engine的CassandraActionStream类型的流处理方法。值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。另外,rowid:Long这样的定义是硬性规定的。cassandra对数据类型的匹配要求很弱智,没有提供任何自然转换。所以,Int <> Long被视为类型错误,而且无法catch任何明白的错误信息。

这项服务的客户端调用如下:

  val stub = CqlGrpcAkkaStream.stub(channel)

  val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
dbName = 'h2,
statement = "select * from AQMRPT where statename='Arkansas'"
) def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
rowid = rs.long("ROWID"),
measureid = rs.long("MEASUREID"),
statename = rs.string("STATENAME"),
countyname = rs.string("COUNTYNAME"),
reportyear = rs.int("REPORTYEAR"),
value = rs.int("VALUE"),
created = Some(ProtoDateTime(Some(ProtoDate(, , )), Some(ProtoTime(, , , ))))
) import scala.concurrent.duration._ def transferRows: Source[CQLResult, NotUsed] = {
log.info(s"**** calling transferRows ****")
jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
// .throttle(1, 500.millis, 1, ThrottleMode.shaping)
.via(stub.transferRows)
}

注意:JDBC在客户端本地,cassandra是远程服务。

最后我们示范一下cassandra Query。.proto DDL 定义:

message CQLQuery {
string statement = ;
bytes parameters = ;
google.protobuf.Int32Value consistency = ;
google.protobuf.Int32Value fetchSize = ;
} service CQLServices {
rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
rpc runDDL(CQLUpdate) returns (CQLResult) {}
}

服务函数代码如下:

 def toCQLTimestamp(rs: Row) = {
try {
val tm = rs.getTimestamp("CREATED")
if (tm == null) None
else {
val localdt = cqlGetTimestamp(tm)
Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
}
}
catch {
case e: Exception => None
}
} val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
statename = rs.getString("STATENAME"),
countyname = rs.getString("COUNTYNAME"),
reportyear = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE"),
created = toCQLTimestamp(rs)
)
override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
log.info("**** runQuery called on service side ***")
Flow[CQLQuery]
.flatMapConcat { q =>
//unpack JDBCQuery and construct the context
var params: Seq[Object] = Nil
if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
params = unmarshal[Seq[Object]](q.parameters)
log.info(s"**** query parameters: ${params} ****")
val ctx = CQLQueryContext(q.statement,params)
CQLEngine.cassandraStream(ctx,toAQMRow)
}
}

这里值得看看的一是日期转换,二是对于cassandra parameter Seq[Object]的marshal和unmarshal。客户端代码:

  val query = CQLQuery(
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
parameters = marshal(Seq("Arkansas", .toInt))
)
val query2 = CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Colorado", .toInt))
)
val query3= CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Arkansas", .toInt))
)
def queryRows: Source[AQMRPTRow,NotUsed] = {
log.info(s"running queryRows ...")
Source
.single(query)
.via(stub.runQuery)
}

这段相对直白。

下面就是本次讨论涉及的完整源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")

resolvers += Resolver.bintrayRepo("beyondthelines", "maven")

libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "compilerplugin" % "0.7.4",
"beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5"
)

build.sbt

import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion name := "gRPCCassandra" version := "0.1" scalaVersion := "2.12.6" resolvers += Resolver.bintrayRepo("beyondthelines", "maven") scalacOptions += "-Ypartial-unification" libraryDependencies := Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
"io.grpc" % "grpc-netty" % grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
"io.monix" %% "monix" % "2.3.0",
// for GRPC Akkastream
"beyondthelines" %% "grpcakkastreamruntime" % "0.0.5",
// for scalikejdbc
"org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
"org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
"com.h2database" % "h2" % "1.4.196",
"mysql" % "mysql-connector-java" % "6.0.6",
"org.postgresql" % "postgresql" % "42.2.0",
"commons-dbcp" % "commons-dbcp" % "1.4",
"org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
"com.zaxxer" % "HikariCP" % "2.7.4",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"com.typesafe.slick" %% "slick" % "3.2.1",
//for cassandra 340
"com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
"com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
"com.typesafe.akka" %% "akka-stream" % "2.5.13",
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.typelevel" %% "cats-core" % "1.1.0"
) PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value,
// generate the akka stream files
grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value
)

main/resources/application.conf

# JDBC settings
test {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolInitialSize =
poolMaxSize =
poolConnectionTimeoutMillis =
poolValidationQuery = "select 1 as one"
poolFactoryName = "commons-dbcp2"
}
} db.mysql.driver = "com.mysql.cj.jdbc.Driver"
db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
db.mysql.user = "root"
db.mysql.password = ""
db.mysql.poolInitialSize =
db.mysql.poolMaxSize =
db.mysql.poolConnectionTimeoutMillis =
db.mysql.poolValidationQuery = "select 1 as one"
db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis =
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth =
}
dev {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolFactoryName = "hikaricp"
numThreads =
maxConnections =
minConnections =
keepAliveConnection = true
}
mysql {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/testdb"
user = "root"
password = ""
poolInitialSize =
poolMaxSize =
poolConnectionTimeoutMillis =
poolValidationQuery = "select 1 as one"
poolFactoryName = "bonecp" }
postgres {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/testdb"
user = "root"
password = ""
poolFactoryName = "hikaricp"
numThreads =
maxConnections =
minConnections =
keepAliveConnection = true
}
}
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis =
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth =
}

main/resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{} - %msg%n
</Pattern>
</layout>
</appender> <logger name="sdp.cql" level="info"
additivity="false">
<appender-ref ref="STDOUT" />
</logger> <logger name="demo.sdp.grpc.cql" level="info"
additivity="false">
<appender-ref ref="STDOUT" />
</logger> <root level="error">
<appender-ref ref="STDOUT" />
</root> </configuration>

main/protobuf/cql.proto

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto"; option (scalapb.options) = {
// use a custom Scala package name
// package_name: "io.ontherocks.introgrpc.demo" // don't append file name to package
flat_package: true // generate one Scala file for all messages (services still get their own file)
single_file: true // add imports to generated file
// useful when extending traits or using custom types
// import: "io.ontherocks.hellogrpc.RockingMessage" // code to put at the top of generated file
// works only with `single_file: true`
//preamble: "sealed trait SomeSealedTrait"
}; /*
* Demoes various customization options provided by ScalaPBs.
*/ package sdp.grpc.services; message ProtoDate {
int32 yyyy = ;
int32 mm = ;
int32 dd = ;
} message ProtoTime {
int32 hh = ;
int32 mm = ;
int32 ss = ;
int32 nnn = ;
} message ProtoDateTime {
ProtoDate date = ;
ProtoTime time = ;
} message AQMRPTRow {
int64 rowid = ;
string countyname = ;
string statename = ;
int64 measureid = ;
int32 reportyear = ;
int32 value = ;
ProtoDateTime created = ;
} message CQLResult {
bytes result = ;
} message CQLQuery {
string statement = ;
bytes parameters = ;
google.protobuf.Int32Value consistency = ;
google.protobuf.Int32Value fetchSize = ;
} message CQLUpdate {
repeated string statements = ;
bytes parameters = ;
google.protobuf.Int32Value consistency = ;
google.protobuf.BoolValue batch = ;
} message HelloMsg {
string hello = ;
} service CQLServices {
rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
rpc runDDL(CQLUpdate) returns (CQLResult) {}
}

logging/log.scala

package sdp.logging

import org.slf4j.Logger

/**
* Logger which just wraps org.slf4j.Logger internally.
*
* @param logger logger
*/
class Log(logger: Logger) { // use var consciously to enable squeezing later
var isDebugEnabled: Boolean = logger.isDebugEnabled
var isInfoEnabled: Boolean = logger.isInfoEnabled
var isWarnEnabled: Boolean = logger.isWarnEnabled
var isErrorEnabled: Boolean = logger.isErrorEnabled def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
level match {
case 'debug | 'DEBUG => debug(msg)
case 'info | 'INFO => info(msg)
case 'warn | 'WARN => warn(msg)
case 'error | 'ERROR => error(msg)
case _ => // nothing to do
}
} def debug(msg: => String): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg)
}
} def debug(msg: => String, e: Throwable): Unit = {
if (isDebugEnabled && logger.isDebugEnabled) {
logger.debug(msg, e)
}
} def info(msg: => String): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg)
}
} def info(msg: => String, e: Throwable): Unit = {
if (isInfoEnabled && logger.isInfoEnabled) {
logger.info(msg, e)
}
} def warn(msg: => String): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg)
}
} def warn(msg: => String, e: Throwable): Unit = {
if (isWarnEnabled && logger.isWarnEnabled) {
logger.warn(msg, e)
}
} def error(msg: => String): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg)
}
} def error(msg: => String, e: Throwable): Unit = {
if (isErrorEnabled && logger.isErrorEnabled) {
logger.error(msg, e)
}
} }

logging/LogSupport.scala

package sdp.logging

import org.slf4j.LoggerFactory

trait LogSupport {

  /**
* Logger
*/
protected val log = new Log(LoggerFactory.getLogger(this.getClass)) }

filestreaming/FileStreaming.scala

package sdp.file

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, StreamConverters}
import akka.util._ import scala.concurrent.Await
import scala.concurrent.duration._ object Streaming {
def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer):ByteBuffer = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toByteBuffer
} def FileToByteArray(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer): Array[Byte] = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
(Await.result(fut, timeOut)).toArray
} def FileToInputStream(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer): InputStream = {
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
hd ++ bs
}
val buf = (Await.result(fut, timeOut)).toArray
new ByteArrayInputStream(buf)
} def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
implicit mat: Materializer) = {
val ba = new Array[Byte](byteBuf.remaining())
byteBuf.get(ba,,ba.length)
val baInput = new ByteArrayInputStream(ba)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
} def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
implicit mat: Materializer) = {
val bb = ByteBuffer.wrap(bytes)
val baInput = new ByteArrayInputStream(bytes)
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
} def InputStreamToFile(is: InputStream, fileName: String)(
implicit mat: Materializer) = {
val source = StreamConverters.fromInputStream(() => is)
source.runWith(FileIO.toPath(Paths.get(fileName)))
} }

jdbc/JDBCConfig.scala

package sdp.jdbc.config
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
import scala.collection.JavaConverters._ def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
def getIntOr(path: String, default: => Int = ) = if(c.hasPath(path)) c.getInt(path) else default
def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
def getDurationOr(path: String, default: => Duration = Duration.Zero) =
if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String, default: => Properties = null): Properties =
if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = {
def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
val props = new Properties(null)
m.foreach { case (k, cv) =>
val v =
if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
else if(cv.unwrapped eq null) null
else cv.unwrapped.toString
if(v ne null) props.put(k, v)
}
props
}
toProps(c.root.asScala)
} def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
def getStringOpt(path: String) = Option(getStringOr(path))
def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
} object ConfigExtensionMethods {
@inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
} trait HikariConfigReader extends TypesafeConfigReader {
self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix => import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = {
val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
} def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig()
val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings
if (c.hasPath("dataSourceClass")) {
hconf.setDataSourceClassName(c.getString("dataSourceClass"))
} else {
Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
}
hconf.setJdbcUrl(c.getStringOr("url", null))
c.getStringOpt("user").foreach(hconf.setUsername)
c.getStringOpt("password").foreach(hconf.setPassword)
c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration
hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", ))
hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", ))
hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", ))
hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", ))
hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", ))
hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
val numThreads = c.getIntOr("numThreads", )
hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * ))
hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
hconf.setPoolName(c.getStringOr("poolName", dbName.name))
hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) // Equivalent of ConnectionPreparer
hconf.setReadOnly(c.getBooleanOr("readOnly", false))
c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
hconf.setCatalog(c.getStringOr("catalog", null)) hconf }
} import scalikejdbc._
trait ConfigDBs {
self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
getFactoryName(dbName) match {
case "hikaricp" => {
val hconf = hikariCPConfig(dbName)
val hikariCPSource = new HikariDataSource(hconf)
case class HikariDataSourceCloser(src: HikariDataSource) extends DataSourceCloser {
var closed = false
override def close(): Unit = src.close()
}
if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
Class.forName(hconf.getDriverClassName)
}
ConnectionPool.add(dbName, new DataSourceConnectionPool(dataSource = hikariCPSource,settings = DataSourceConnectionPoolSettings(),
closer = HikariDataSourceCloser(hikariCPSource)))
}
case _ => {
val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
val cpSettings = readConnectionPoolSettings(dbName)
if (driver != null && driver.trim.nonEmpty) {
Class.forName(driver)
}
ConnectionPool.add(dbName, url, user, password, cpSettings)
}
}
} def setupAll(): Unit = {
loadGlobalSettings()
dbNames.foreach { dbName => setup(Symbol(dbName)) }
} def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
ConnectionPool.close(dbName)
} def closeAll(): Unit = {
ConnectionPool.closeAll
} } object ConfigDBs extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader
with EnvPrefix { override val env = Option(envValue)
}

jdbc/JDBCEngine.scala

package sdp.jdbc.engine
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.stream._
import java.time._
import scala.concurrent.duration._
import scala.concurrent._
import sdp.file.Streaming._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor
import java.io.InputStream import sdp.logging.LogSupport object JDBCContext {
type SQLTYPE = Int
val SQL_EXEDDL=
val SQL_UPDATE =
val RETURN_GENERATED_KEYVALUE = true
val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M](
dbName: Symbol,
statement: String,
parameters: Seq[Any] = Nil,
fetchSize: Int = ,
autoCommit: Boolean = false,
queryTimeout: Option[Int] = None) case class JDBCContext (
dbName: Symbol,
statements: Seq[String] = Nil,
parameters: Seq[Seq[Any]] = Nil,
fetchSize: Int = ,
queryTimeout: Option[Int] = None,
queryTags: Seq[String] = Nil,
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
batch: Boolean = false,
returnGeneratedKey: Seq[Option[Any]] = Nil,
// no return: None, return by index: Some(1), by name: Some("id")
preAction: Option[PreparedStatement => Unit] = None,
postAction: Option[PreparedStatement => Unit] = None)
extends LogSupport { ctx => //helper functions def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == ) {
val nc = ctx.copy(preAction = action)
log.info("setPreAction> set")
nc
}
else {
log.info("setPreAction> JDBCContex setting error: preAction not supported!")
throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
}
} def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == ) {
val nc = ctx.copy(postAction = action)
log.info("setPostAction> set")
nc
}
else {
log.info("setPreAction> JDBCContex setting error: postAction not supported!")
throw new IllegalStateException("JDBCContex setting error: postAction not supported!")
}
} def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
log.info(s"appendDDLCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
val nc = ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(Seq(_parameters))
)
log.info(s"appendDDLCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
nc
} else {
log.info(s"appendDDLCommand> JDBCContex setting error: option not supported!")
throw new IllegalStateException("JDBCContex setting error: option not supported!")
}
} def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
log.info(s"appendUpdateCommand> appending: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
val nc = ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(_parameters),
returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some()) else Seq(None))
)
log.info(s"appendUpdateCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
nc
} else {
log.info(s"appendUpdateCommand> JDBCContex setting error: option not supported!")
throw new IllegalStateException("JDBCContex setting error: option not supported!")
}
} def appendBatchParameters(_parameters: Any*): JDBCContext = {
log.info(s"appendBatchParameters> appending: parameters: ${_parameters}")
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) {
log.info(s"appendBatchParameters> JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
}
var matchParams = true
if (ctx.parameters != Nil)
if (ctx.parameters.head.size != _parameters.size)
matchParams = false
if (matchParams) {
val nc = ctx.copy(
parameters = ctx.parameters ++ Seq(_parameters)
)
log.info(s"appendBatchParameters> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
nc
} else {
log.info(s"appendBatchParameters> JDBCContex setting error: batch command parameters not match!")
throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
}
} def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
ctx.copy(
returnGeneratedKey = if (returnKey) Seq(Some()) else Nil
)
} def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
log.info(s"setDDLCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
val nc = ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
sqlType = JDBCContext.SQL_EXEDDL,
batch = false
)
log.info(s"setDDLCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
nc
} def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
log.info(s"setUpdateCommand> setting: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
val nc = ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
returnGeneratedKey = if (_returnGeneratedKey) Seq(Some()) else Seq(None),
sqlType = JDBCContext.SQL_UPDATE,
batch = false
)
log.info(s"setUpdateCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
nc
}
def setBatchCommand(_statement: String): JDBCContext = {
log.info(s"setBatchCommand> appending: statement: ${_statement}")
val nc = ctx.copy (
statements = Seq(_statement),
sqlType = JDBCContext.SQL_UPDATE,
batch = true
)
log.info(s"setBatchCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
nc
} } object JDBCEngine extends LogSupport {
import JDBCContext._ type JDBCDate = LocalDate
type JDBCDateTime = LocalDateTime
type JDBCTime = LocalTime def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)
def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) = LocalDateTime.of(date,time)
def jdbcSetNow = LocalDateTime.now() def jdbcGetDate(sqlDate: java.sql.Date): java.time.LocalDate = sqlDate.toLocalDate
def jdbcGetTime(sqlTime: java.sql.Time): java.time.LocalTime = sqlTime.toLocalTime
def jdbcGetTimestamp(sqlTimestamp: java.sql.Timestamp): java.time.LocalDateTime =
sqlTimestamp.toLocalDateTime type JDBCBlob = InputStream def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = seconds)(
implicit mat: Materializer) = FileToInputStream(fileName,timeOut) def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
implicit mat: Materializer) = InputStreamToFile(blob,fileName) private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
throw new IllegalStateException(message)
} def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)
(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
sql.iterator
.withDBSessionForceAdjuster(session => {
session.connection.setAutoCommit(ctx.autoCommit)
session.fetchSize(ctx.fetchSize)
})
}
log.info(s"jdbcAkkaStream> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
Source.fromPublisher[A](publisher)
} def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],
extractor: WrappedResultSet => A)(
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
rawSql.fetchSize(ctx.fetchSize)
try {
implicit val session = NamedAutoSession(ctx.dbName)
log.info(s"jdbcQueryResult> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
sql.collection.apply[C]()
} catch {
case e: Exception =>
log.error(s"jdbcQueryResult> runtime error: ${e.getMessage}")
throw new RuntimeException(s"jdbcQueryResult> Error: ${e.getMessage}") } } def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
if (ctx.sqlType != SQL_EXEDDL) {
log.info(s"jdbcExecuteDDL> JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")
Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
}
else {
log.info(s"jdbcExecuteDDL> Source: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
Future {
NamedDB(ctx.dbName) localTx { implicit session =>
ctx.statements.foreach { stm =>
val ddl = new SQLExecution(statement = stm, parameters = Nil)(
before = WrappedResultSet => {})(
after = WrappedResultSet => {}) ddl.apply()
}
"SQL_EXEDDL executed succesfully."
}
}
}
} def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit ec: ExecutionContextExecutor,
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
if (ctx.statements == Nil) {
log.info(s"jdbcBatchUpdate> JDBCContex setting error: statements empty!")
Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
}
if (ctx.sqlType != SQL_UPDATE) {
log.info(s"jdbcBatchUpdate> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
}
else {
if (ctx.batch) {
if (noReturnKey(ctx)) {
log.info(s"jdbcBatchUpdate> batch updating no return: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
val usql = SQL(ctx.statements.head)
.tags(ctx.queryTags: _*)
.batch(ctx.parameters: _*)
Future {
NamedDB(ctx.dbName) localTx { implicit session =>
ctx.queryTimeout.foreach(session.queryTimeout(_))
usql.apply[Seq]()
Seq.empty[Long].to[C]
}
}
} else {
log.info(s"jdbcBatchUpdate> batch updating return genkey: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
Future {
NamedDB(ctx.dbName) localTx { implicit session =>
ctx.queryTimeout.foreach(session.queryTimeout(_))
usql.apply[C]()
}
}
} } else {
log.info(s"jdbcBatchUpdate> JDBCContex setting error: must set batch = true !")
Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
}
}
}
private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit ec: ExecutionContextExecutor,
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
val Some(key) :: xs = ctx.returnGeneratedKey
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
log.info(s"singleTxUpdateWithReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
Future {
NamedDB(ctx.dbName) localTx { implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val result = usql.apply()
Seq(result).to[C]
}
}
} private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit ec: ExecutionContextExecutor,
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
val before = ctx.preAction match {
case None => pstm: PreparedStatement => {}
case Some(f) => f
}
val after = ctx.postAction match {
case None => pstm: PreparedStatement => {}
case Some(f) => f
}
log.info(s"singleTxUpdateNoReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
Future {
NamedDB(ctx.dbName) localTx {implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val result = usql.apply()
Seq(result.toLong).to[C]
}
} } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit ec: ExecutionContextExecutor,
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
if (noReturnKey(ctx))
singleTxUpdateNoReturnKey(ctx)
else
singleTxUpdateWithReturnKey(ctx)
} private def noReturnKey(ctx: JDBCContext): Boolean = {
if (ctx.returnGeneratedKey != Nil) {
val k :: xs = ctx.returnGeneratedKey
k match {
case None => true
case Some(k) => false
}
} else true
} def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit ec: ExecutionContextExecutor,
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
case Nil => Seq.fill(ctx.statements.size)(None)
case k@_ => k
}
val sqlcmd = ctx.statements zip ctx.parameters zip keys
log.info(s"multiTxUpdates> updating: db: ${ctx.dbName}, SQL Commands: ${sqlcmd}")
Future {
NamedDB(ctx.dbName) localTx { implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val results = sqlcmd.map { case ((stm, param), key) =>
key match {
case None =>
new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
case Some(k) =>
new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
}
}
results.to[C]
}
}
} def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit ec: ExecutionContextExecutor,
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
if (ctx.statements == Nil) {
log.info(s"jdbcTxUpdates> JDBCContex setting error: statements empty!")
Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
}
if (ctx.sqlType != SQL_UPDATE) {
log.info(s"jdbcTxUpdates> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
}
else {
if (!ctx.batch) {
if (ctx.statements.size == )
singleTxUpdate(ctx)
else
multiTxUpdates(ctx)
} else {
log.info(s"jdbcTxUpdates> JDBCContex setting error: must set batch = false !")
Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
}
}
} case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = , processInOrder: Boolean = true,
statement: String, prepareParams: R => Seq[Any]) extends LogSupport {
jas =>
def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
import scala.concurrent._
val params = prepareParams(r)
log.info(s"JDBCActionStream.perform> db: ${dbName}, statement: ${statement}, parameters: ${params}")
Future {
NamedDB(dbName) autoCommit { session =>
session.execute(statement, params: _*)
}
r
}
}
def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
if (processInOrder)
Flow[R].mapAsync(parallelism)(perform)
else
Flow[R].mapAsyncUnordered(parallelism)(perform) } object JDBCActionStream {
def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
} }

cql/CassandraEngine.scala

package sdp.cql.engine

import akka.NotUsed
import akka.stream.alpakka.cassandra.scaladsl._
import akka.stream.scaladsl._
import com.datastax.driver.core._
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent._
import scala.concurrent.duration.Duration
import sdp.logging.LogSupport object CQLContext {
// Consistency Levels
type CONSISTENCY_LEVEL = Int
val ANY: CONSISTENCY_LEVEL = 0x0000
val ONE: CONSISTENCY_LEVEL = 0x0001
val TWO: CONSISTENCY_LEVEL = 0x0002
val THREE: CONSISTENCY_LEVEL = 0x0003
val QUORUM : CONSISTENCY_LEVEL = 0x0004
val ALL: CONSISTENCY_LEVEL = 0x0005
val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
consistency match {
case ALL => ConsistencyLevel.ALL
case ONE => ConsistencyLevel.ONE
case TWO => ConsistencyLevel.TWO
case THREE => ConsistencyLevel.THREE
case ANY => ConsistencyLevel.ANY
case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
case QUORUM => ConsistencyLevel.QUORUM
case SERIAL => ConsistencyLevel.SERIAL
case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL }
} }
case class CQLQueryContext(
statement: String,
parameter: Seq[Object] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
fetchSize: Int =
) { ctx =>
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext =
ctx.copy(consistency = Some(_consistency))
def setFetchSize(pageSize: Int): CQLQueryContext =
ctx.copy(fetchSize = pageSize)
def setParameters(param: Seq[Object]): CQLQueryContext =
ctx.copy(parameter = param)
}
object CQLQueryContext {
def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameter = param)
} case class CQLContext(
statements: Seq[String],
parameters: Seq[Seq[Object]] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
batch: Boolean = false
) extends LogSupport { ctx =>
def setBatch(bat: Boolean) = ctx.copy(batch = bat)
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
ctx.copy(consistency = Some(_consistency))
def setCommand(_statement: String, _parameters: Object*): CQLContext = {
log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
val nc = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}")
nc
}
def appendCommand(_statement: String, _parameters: Object*): CQLContext = {
log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
val nc = ctx.copy(statements = ctx.statements :+ _statement,
parameters = ctx.parameters ++ Seq(_parameters))
log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}")
nc
}
} object CQLEngine extends LogSupport {
import CQLContext._
import CQLHelpers._ import cats._, cats.data._, cats.implicits._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int =
,extractor: Row => A)(
implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind()
var params: Seq[Object] = Nil
if (ctx.parameter != Nil) {
params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
}
log.info(s"fetchResultPage> statement: ${prepStmt.getQueryString}, parameters: ${params}") ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
(resultSet,(resultSet.asScala.view.map(extractor)).to[C])
}
def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
if (resultSet.isFullyFetched) {
(resultSet, None)
} else {
try {
val result = Await.result(resultSet.fetchMoreResults(), timeOut)
(result, Some((result.asScala.view.map(extractor)).to[C]))
} catch { case e: Throwable => (resultSet, None) }
} def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { var invalidBat = false
if ( ctx.batch ) {
if (ctx.parameters == Nil)
invalidBat = true
else if (ctx.parameters.size < )
invalidBat = true;
}
if (!ctx.batch || invalidBat) {
if(invalidBat)
log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.") if (ctx.statements.size == ) {
var param: Seq[Object] = Nil
if (ctx.parameters != Nil) param = ctx.parameters.head
log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
}
else {
var params: Seq[Seq[Object]] = Nil
if (ctx.parameters == Nil)
params = Seq.fill(ctx.statements.length)(Nil)
else {
if (ctx.statements.size > ctx.parameters.size) {
log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
params = ctx.parameters ++ nils }
else
params = ctx.parameters
} val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
log.info(s"cqlExecute> multi-commands: ${commands}")
/*
//using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
//therefore, make sure no command replies on prev command effect
val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
}.toList val futList = lstCmds.sequence.map(_ => true) //must map to execute
*/
/*
//using traverse to have some degree of parallelism = max(runtimes)
//therefore, make sure no command replies on prev command effect
val futList = Future.traverse(commands) { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
}.map(_ => true) Await.result(futList, 3 seconds)
Future.successful(true)
*/
// run sync directly
Future {
commands.foreach { case (stm, pars) =>
cqlExecuteSync(ctx.consistency, stm, pars)
}
true
}
}
}
else
cqlBatchUpdate(ctx)
}
def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(stmt) var boundStmt = prepStmt.bind()
var pars: Seq[Object] = Nil
if (params != Nil) {
pars = processParameters(params)
boundStmt = prepStmt.bind(pars: _*)
}
log.info(s"cqlSingleUpdate> statement: ${prepStmt.getQueryString}, parameters: ${pars}") cons.foreach { consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))
}
session.executeAsync(boundStmt).map(_.wasApplied())
} def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
implicit session: Session, ec: ExecutionContext): Boolean = { val prepStmt = session.prepare(stmt) var boundStmt = prepStmt.bind()
var pars: Seq[Object] = Nil
if (params != Nil) {
pars = processParameters(params)
boundStmt = prepStmt.bind(pars: _*)
}
log.info(s"cqlExecuteSync> statement: ${prepStmt.getQueryString}, parameters: ${pars}") cons.foreach { consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))
}
session.execute(boundStmt).wasApplied() } def cqlBatchUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
var params: Seq[Seq[Object]] = Nil
if (ctx.parameters == Nil)
params = Seq.fill(ctx.statements.length)(Nil)
else
params = ctx.parameters
log.info(s"cqlBatchUpdate> statement: ${ctx.statements.head}, parameters: ${params}") val prepStmt = session.prepare(ctx.statements.head) var batch = new BatchStatement()
params.foreach { p =>
log.info(s"cqlBatchUpdate> batch with raw parameter: ${p}")
val pars = processParameters(p)
log.info(s"cqlMultiUpdate> batch with cooked parameters: ${pars}")
batch.add(prepStmt.bind(pars: _*))
}
ctx.consistency.foreach { consistency =>
batch.setConsistencyLevel(consistencyLevel(consistency))
}
session.executeAsync(batch).map(_.wasApplied())
} def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A)
(implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement)
var boundStmt = prepStmt.bind()
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} log.info(s"cassandraStream> statement: ${prepStmt.getQueryString}, parameters: ${params}")
CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor)
} case class CassandraActionStream[R](parallelism: Int = , processInOrder: Boolean = true,
statement: String, prepareParams: R => Seq[Object],
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
cas.copy(consistency = Some(_consistency)) def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
var prepStmt = session.prepare(statement)
var boundStmt = prepStmt.bind()
val params = processParameters(prepareParams(r))
boundStmt = prepStmt.bind(params: _*)
consistency.foreach { cons =>
boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
}
log.info(s"CassandraActionStream.perform> statement: ${prepStmt.getQueryString}, parameters: ${params}")
session.executeAsync(boundStmt).map(_ => r)
} def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
if (processInOrder)
Flow[R].mapAsync(parallelism)(perform)
else
Flow[R].mapAsyncUnordered(parallelism)(perform) def unloggedBatch[K](statementBinder: (
R, PreparedStatement) => BoundStatement,partitionKey: R => K)(
implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = {
val preparedStatement = session.prepare(statement)
log.info(s"CassandraActionStream.unloggedBatch> statement: ${preparedStatement.getQueryString}")
CassandraFlow.createUnloggedBatchWithPassThrough[R, K](
parallelism,
preparedStatement,
statementBinder,
partitionKey)
} }
object CassandraActionStream {
def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
new CassandraActionStream[R]( statement=_statement, prepareParams = params)
} } object CQLHelpers extends LogSupport {
import java.nio.ByteBuffer
import java.io._
import java.nio.file._
import com.datastax.driver.core.LocalDate
import com.datastax.driver.extras.codecs.jdk8.InstantCodec
import java.time.Instant
import akka.stream.scaladsl._
import akka.stream._ implicit def listenableFutureToFuture[T](
listenableFuture: ListenableFuture[T]): Future[T] = {
val promise = Promise[T]()
Futures.addCallback(listenableFuture, new FutureCallback[T] {
def onFailure(error: Throwable): Unit = {
promise.failure(error)
()
}
def onSuccess(result: T): Unit = {
promise.success(result)
()
}
})
promise.future
} case class CQLDate(year: Int, month: Int, day: Int)
case object CQLTodayDate
case class CQLDateTime(year: Int, Month: Int,
day: Int, hour: Int, minute: Int, second: Int, millisec: Int = )
case object CQLDateTimeNow def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate =
dateToConvert.toInstant()
.atZone(java.time.ZoneId.systemDefault())
.toLocalDate() def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime =
dateToConvert.toInstant()
.atZone(java.time.ZoneId.systemDefault())
.toLocalTime() def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime=
new java.sql.Timestamp(
dateToConvert.getTime()
).toLocalDateTime() def processParameters(params: Seq[Object]): Seq[Object] = {
import java.time.{Clock,ZoneId}
log.info(s"[processParameters] input: ${params}")
val outParams = params.map { obj =>
obj match {
case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
case CQLTodayDate =>
val today = java.time.LocalDate.now()
LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
case p@_ => p
}
}
log.info(s"[processParameters] output: ${params}")
outParams
}
class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
override def read: Int = {
if (!buf.hasRemaining) return -
buf.get
} override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
val length: Int = Math.min(len, buf.remaining)
buf.get(bytes, off, length)
length
}
}
object ByteBufferInputStream {
def apply(buf: ByteBuffer): ByteBufferInputStream = {
new ByteBufferInputStream(buf)
}
}
class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { override def write(b: Int): Unit = {
buf.put(b.toByte)
} override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
buf.put(bytes, off, len)
}
}
object FixsizedByteBufferOutputStream {
def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
}
class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val position = buf.position
val limit = buf.limit
val newTotal: Long = position + len
if(newTotal > limit){
var capacity = (buf.capacity * increasing)
while(capacity <= newTotal){
capacity = (capacity*increasing)
}
increase(capacity.toInt)
} buf.put(b, , len)
} override def write(b: Int): Unit= {
if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
buf.put(b.toByte)
}
protected def increase(newCapacity: Int): Unit = {
buf.limit(buf.position)
buf.rewind
val newBuffer =
if (onHeap) ByteBuffer.allocate(newCapacity)
else ByteBuffer.allocateDirect(newCapacity)
newBuffer.put(buf)
buf.clear
buf = newBuffer
}
def size: Long = buf.position
def capacity: Long = buf.capacity
def byteBuffer: ByteBuffer = buf
}
object ExpandingByteBufferOutputStream {
val DEFAULT_INCREASING_FACTOR = 1.5f
def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
if (increasingBy <= ) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
val buffer: ByteBuffer =
if (onHeap) ByteBuffer.allocate(size)
else ByteBuffer.allocateDirect(size)
new ExpandingByteBufferOutputStream(buffer,onHeap)
}
def apply(size: Int): ExpandingByteBufferOutputStream = {
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
} def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
} def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
apply(size, increasingBy, false)
} }
def cqlFileToBytes(fileName: String): ByteBuffer = {
val fis = new FileInputStream(fileName)
val b = new Array[Byte](fis.available + )
val length = b.length
fis.read(b)
ByteBuffer.wrap(b)
}
def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
implicit mat: Materializer): Future[IOResult] = {
val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
val outputFormat = new java.text.SimpleDateFormat(fmt)
outputFormat.format(date)
}
def useJava8DateTime(cluster: Cluster) = {
//for jdk8 datetime format
cluster.getConfiguration().getCodecRegistry()
.register(InstantCodec.instance)
}
}

BytesConverter.scala

package protobuf.bytes
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
import com.google.protobuf.ByteString
object Converter { def marshal(value: Any): ByteString = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close()
ByteString.copyFrom(stream.toByteArray())
} def unmarshal[A](bytes: ByteString): A = {
val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
val value = ois.readObject()
ois.close()
value.asInstanceOf[A]
} }

CQLServices.scala

package demo.sdp.grpc.cql.server

import akka.NotUsed
import akka.stream.scaladsl._ import protobuf.bytes.Converter._
import com.datastax.driver.core._ import scala.concurrent.ExecutionContextExecutor
import sdp.grpc.services._
import sdp.cql.engine._
import CQLEngine._
import CQLHelpers._
import sdp.logging.LogSupport
import scala.concurrent._
import scala.concurrent.duration._
import akka.stream.ActorMaterializer class CQLStreamingServices(implicit ec: ExecutionContextExecutor,
mat: ActorMaterializer, session: Session)
extends CqlGrpcAkkaStream.CQLServices with LogSupport{ val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
row.rowid.asInstanceOf[Object],
row.measureid.asInstanceOf[Object],
row.statename,
row.countyname,
row.reportyear.asInstanceOf[Object],
row.value.asInstanceOf[Object],
CQLDateTimeNow
)
val cqlInsert ="""
|insert into testdb.AQMRPT(
| rowid,
| measureid,
| statename,
| countyname,
| reportyear,
| value,
| created)
| values(?,?,?,?,?,?,?)
""".stripMargin val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism()
.setProcessOrder(false) /*
val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
Flow[AQMRPTRow]
.via(cqlActionStream.performOnRow)
*/ val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
Flow[AQMRPTRow]
.mapAsync(cqlActionStream.parallelism){ row =>
if (IfExists(row.rowid))
Future.successful(CQLResult(marshal()))
else
cqlActionStream.perform(row).map {_ => CQLResult(marshal())}
}
} override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
Flow[AQMRPTRow]
.via(cqlActionFlow)
} private def IfExists(rowid: Long): Boolean = { val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
val param = Seq(rowid.asInstanceOf[Object])
val toRowId: Row => Long = r => r.getLong("rowid")
val ctx = CQLQueryContext(cql,param)
val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
val fut = src.toMat(Sink.headOption)(Keep.right).run() val result = Await.result(fut, seconds) log.info(s"checking existence: ${result}")
result match {
case Some(x) => true
case None => false
} } override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
Flow[HelloMsg]
.map {r => println(r) ; r}
} override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
Flow[CQLUpdate]
.flatMapConcat { context =>
//unpack CQLUpdate and construct the context
val ctx = CQLContext(context.statements)
log.info(s"**** CQLContext => ${ctx} ***") Source
.fromFuture(cqlExecute(ctx))
.map { r => CQLResult(marshal(r)) }
}
} def toCQLTimestamp(rs: Row) = {
try {
val tm = rs.getTimestamp("CREATED")
if (tm == null) None
else {
val localdt = cqlGetTimestamp(tm)
Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
}
}
catch {
case e: Exception => None
}
} val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
statename = rs.getString("STATENAME"),
countyname = rs.getString("COUNTYNAME"),
reportyear = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE"),
created = toCQLTimestamp(rs)
)
override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
log.info("**** runQuery called on service side ***")
Flow[CQLQuery]
.flatMapConcat { q =>
//unpack JDBCQuery and construct the context
var params: Seq[Object] = Nil
if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
params = unmarshal[Seq[Object]](q.parameters)
log.info(s"**** query parameters: ${params} ****")
val ctx = CQLQueryContext(q.statement,params)
CQLEngine.cassandraStream(ctx,toAQMRow) }
} }

CQLServer.scala

package demo.sdp.grpc.cql.server

import java.util.logging.Logger
import com.datastax.driver.core._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import io.grpc.Server
import io.grpc.ServerBuilder
import sdp.grpc.services._
import sdp.cql.engine._
import CQLHelpers._ class gRPCServer(server: Server) { val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName) def start(): Unit = {
server.start()
logger.info(s"Server started, listening on ${server.getPort}")
sys.addShutdownHook {
// Use stderr here since the logger may has been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down")
stop()
System.err.println("*** server shut down")
}
()
} def stop(): Unit = {
server.shutdown()
} /**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
def blockUntilShutdown(): Unit = {
server.awaitTermination()
}
} object CQLServer extends App {
implicit val cqlsys = ActorSystem("cqlSystem")
implicit val mat = ActorMaterializer()
implicit val ec = cqlsys.dispatcher val cluster = new Cluster
.Builder()
.addContactPoints("localhost")
.withPort()
.build() useJava8DateTime(cluster)
implicit val session = cluster.connect() val server = new gRPCServer(
ServerBuilder
.forPort()
.addService(
CqlGrpcAkkaStream.bindService(
new CQLStreamingServices
)
).build()
)
server.start()
// server.blockUntilShutdown()
scala.io.StdIn.readLine()
session.close()
cluster.close()
mat.shutdown()
cqlsys.terminate()
}

CQLClient.scala

package demo.sdp.grpc.cql.client

import sdp.grpc.services._
import protobuf.bytes.Converter._
import akka.stream.scaladsl._
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ThrottleMode}
import io.grpc._
import sdp.logging.LogSupport
import sdp.jdbc.engine._
import JDBCEngine._
import scalikejdbc.WrappedResultSet
import sdp.cql.engine.CQLHelpers.CQLDateTimeNow
import scala.util._
import scala.concurrent.ExecutionContextExecutor class CQLStreamClient(host: String, port: Int)(
implicit ec: ExecutionContextExecutor) extends LogSupport { val channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext(true)
.build() val stub = CqlGrpcAkkaStream.stub(channel) val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
dbName = 'h2,
statement = "select * from AQMRPT where statename='Arkansas'"
) def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
rowid = rs.long("ROWID"),
measureid = rs.long("MEASUREID"),
statename = rs.string("STATENAME"),
countyname = rs.string("COUNTYNAME"),
reportyear = rs.int("REPORTYEAR"),
value = rs.int("VALUE"),
created = Some(ProtoDateTime(Some(ProtoDate(, , )), Some(ProtoTime(, , , ))))
) import scala.concurrent.duration._ def transferRows: Source[CQLResult, NotUsed] = {
log.info(s"**** calling transferRows ****")
jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
// .throttle(1, 500.millis, 1, ThrottleMode.shaping)
.via(stub.transferRows)
} def echoHello: Source[HelloMsg,NotUsed] = {
val row = HelloMsg("hello world!")
val rows = List.fill[HelloMsg]()(row)
Source
.fromIterator(() => rows.iterator)
.via(stub.clientStreaming)
}
val query0 = CQLQuery(
statement = "select * from testdb.AQMRPT"
) val query = CQLQuery(
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
parameters = marshal(Seq("Arkansas", .toInt))
)
val query2 = CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Colorado", .toInt))
)
val query3= CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Arkansas", .toInt))
) def queryRows: Source[AQMRPTRow,NotUsed] = {
log.info(s"running queryRows ...")
Source
.single(query)
.via(stub.runQuery)
} val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT" val createCQL ="""
CREATE TABLE testdb.AQMRPT (
rowid bigint primary key,
measureid bigint,
statename text,
countyname text,
reportyear int,
value int,
created timestamp
)""" val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
def createTbl: Source[CQLResult,NotUsed] = {
log.info(s"running createTbl ...")
Source
.single(cqlddl)
.via(stub.runDDL)
} } object EchoHelloClient extends App {
implicit val system = ActorSystem("EchoNumsClient")
implicit val mat = ActorMaterializer.create(system)
implicit val ec = system.dispatcher
val client = new CQLStreamClient("localhost", ) client.echoHello.runForeach(println) scala.io.StdIn.readLine()
mat.shutdown()
system.terminate() } object TransferRows extends App { import sdp.jdbc.config._ implicit val system = ActorSystem("JDBCServer")
implicit val mat = ActorMaterializer.create(system)
implicit val ec = system.dispatcher ConfigDBsWithEnv("dev").setup('h2)
ConfigDBsWithEnv("dev").loadGlobalSettings() val client = new CQLStreamClient("localhost", )
val fut = client.transferRows.runFold(){(a,b) => a + unmarshal[Int](b.result)}
fut.onComplete {
case scala.util.Success(cnt) => println(s"done transfer ${cnt} rows.")
case Failure(e) => println(s"!!!!!streaming error: ${e.getMessage}")
} scala.io.StdIn.readLine()
ConfigDBsWithEnv("dev").close('h2)
mat.shutdown()
system.terminate() } object QueryRows extends App {
implicit val system = ActorSystem("QueryRows")
implicit val mat = ActorMaterializer.create(system)
implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", ) val fut = client.queryRows.runForeach { r => println(r) }
fut.onComplete {
case scala.util.Success(d) => println(s"done querying.")
case Failure(e) => println(s"!!!!!query error: ${e.getMessage}")
} scala.io.StdIn.readLine()
mat.shutdown()
system.terminate() } object RunDDL extends App {
implicit val system = ActorSystem("RunDDL")
implicit val mat = ActorMaterializer.create(system)
implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", ) client.createTbl.runForeach { r => println(unmarshal(r.result)) } scala.io.StdIn.readLine()
mat.shutdown()
system.terminate() }

最新文章

  1. HTML5 data-* 自定义属性
  2. Winform布局方式
  3. CG_INLINE,inline 内联函数
  4. ANE 从入门到精通 --- 使用R* 访问资源
  5. WindowsFormsHost使用问题
  6. am335x sd卡启动开启识别emmc kernel 上的改动
  7. [问题2015S01] 复旦高等代数 II(14级)每周一题(第二教学周)
  8. InnoDB和MyISAM(转)
  9. 一. Logback与p6spy
  10. 你已经毁了JavaScript
  11. js自定义方法名
  12. How to decompile class file in Java and Eclipse - Javap command example(转)
  13. Java IO(2)阻塞式输入输出(BIO)的字节流与字符流
  14. hdu 5131(2014 广州—模拟)
  15. Redtiger SQL注入练习(一)
  16. .Net Core应用框架Util介绍(六)
  17. 搭建React项目(一):在网页中使用
  18. T-SQL 类型转换
  19. 前端 HTML标签介绍
  20. CoderForce 148D-Bag of mice (概率DP求概率)

热门文章

  1. GCC选项之-M
  2. python之函数篇3
  3. 模块and包
  4. 谷歌开源OCR,tesseract-ocr使用笔记
  5. 2018.10.26 洛谷P4551 最长异或路径(01trie)
  6. JSP错误
  7. mysql的myBatis,主键自增设置
  8. shell常见命令
  9. 5-具体学习git--分支冲突,merge合并
  10. matchesSelector()方法