前面介绍了事件源(EventSource)和集群(cluster),现在到了讨论CQRS的时候了。CQRS即读写分离模式,由独立的写方程序和读方程序组成,具体原理在以前的博客里介绍过了。akka-typed应该自然支持CQRS模式,最起码本身提供了对写方编程的支持,这点从EventSourcedBehavior 可以知道。akka-typed提供了新的EventSourcedBehavior-Actor,极大方便了对persistentActor的应用开发,但同时也给编程者造成了一些限制。如手工改变状态会更困难了、EventSourcedBehavior不支持多层式的persist,也就是说通过persist某些特定的event然后在event-handler程序里进行状态处理是不可能的了。我这里有个例子,是个购物车应用:当完成支付后需要取个快照(snapshot),下面是这个snapshot的代码:

       snapshotWhen {
(state,evt,seqNr) => CommandHandler.takeSnapshot(state,evt,seqNr)
}
... def takeSnapshot(state: Voucher, evt: Events.Action, lstSeqNr: Long)(implicit pid: PID) = {
if (evt.isInstanceOf[Events.PaymentMade]
|| evt.isInstanceOf[Events.VoidVoucher.type]
|| evt.isInstanceOf[Events.SuspVoucher.type])
if (state.items.isEmpty) {
log.step(s"#${state.header.num} taking snapshot at [$lstSeqNr] ...")
true
} else
false
else
false }

判断event类型是没有问题的,因为正是当前的事件,但另一个条件是购物车必须是清空了的。这个有点为难,因为这个状态要依赖这几个event运算的结果才能确定,也就是下一步,但确定结果又需要对购物车内容进行计算,好像是个死循环。在akka-classic里我们可以在判断了event运算结果后,如果需要改变状态就再persist一个特殊的event,然后在这个event的handler进行状态处理。没办法,EventSourcedBehavior不支持多层persist,只有这样做:

      case PaymentMade(acct, dpt, num, ref,amount) =>
...
writerInternal.lastVoucher = Voucher(vchs, vItems)
endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)
Voucher(vchs.nextVoucher, List())
...

我只能先吧当前状态保存下来、进行结单运算、然后清空购物车,这样snapshot就可以顺利进行了。

好了,akka的读方编程是通过PersistentQuery实现的。reader的作用就是把event从数据库读出来后再恢复成具体的数据格式。我们从reader的调用了解一下这个应用里reader的实现细节:

    val readerShard = writerInternal.optSharding.get
val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")
readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)

可以看到这个reader是一个集群分片,sharding-entity。想法是每单完成购买后发个消息给一个entity、这个entity再完成reader功能后自动终止,立即释放出占用的资源。reader-actor的定义如下:

object POSReader extends LogSupport {
val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("POSReader") def apply(nodeAddress: String, trace: Boolean): Behavior[Command] = {
log.stepOn = trace
implicit var pid: PID = PID("","")
Behaviors.supervise(
Behaviors.setup[Command] { ctx =>
Behaviors.withTimers { timer =>
implicit val ec = ctx.executionContext
Behaviors.receiveMessage {
case PerformRead(shopid, posid, vchnum, opr, bseq, eseq, txntype, xurl, xacct, xpass) =>
pid = PID(shopid, posid)
log.step(s"POSReader: PerformRead($shopid,$posid,$vchnum,$opr,$bseq,$eseq,$txntype,$xurl,$xacct,$xpass)")(PID(shopid, posid))
val futReadSaveNExport = for {
txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)
_ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend,
{ if(txntype == Events.TXNTYPE.voidall)
txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))
else txnitems },
trace)(ctx.system.toClassic, pid)
} yield ()
ctx.pipeToSelf(futReadSaveNExport) {
case Success(_) => {
timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)
StopReader
}
case Failure(err) =>
log.error(s"POSReader: Error: ${err.getMessage}")
timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)
StopReader
} Behaviors.same
case StopReader =>
Behaviors.same
case ReaderFinish(shopid, posid, vchnum) =>
Behaviors.stopped(
() => log.step(s"POSReader: {$shopid,$posid} finish reading voucher#$vchnum and stopped")(PID(shopid, posid))
)
}
}
}
).onFailure(SupervisorStrategy.restart)
}

reader就是一个普通的actor。值得注意的是读方程序可能是一个庞大复杂的程序,肯定需要分割成多个模块,所以我们可以按照流程顺序进行模块功能切分:这样下面的模块可能会需要上面模块产生的结果才能继续。记住,在actor中绝对避免阻塞线程,所有的模块都返回Future, 然后用for-yield串起来。上面我们用了ctx.pipeToSelf 在Future运算完成后发送ReaderFinish消息给自己,通知自己停止。

在这个例子里我们把reader任务分成:

1、从数据库读取事件

2、事件重演一次产生状态数据(购物车内容)

3、将形成的购物车内容作为交易单据项目存入数据库

4、向用户提供的restapi输出交易数据

event读取是通过cassandra-persistence-plugin实现的:

    val query =
PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) // issue query to journal
val source: Source[EventEnvelope, NotUsed] =
query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq) // materialize stream, consuming events
val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

这部分比较简单:定义一个PersistenceQuery,用它产生一个Source,然后run这个Source获取Future[List[Any]]。

重演事件产生交易数据:

    def buildVoucher(actions: List[Any]): List[TxnItem] = {
log.step(s"POSReader: read actions: $actions")
val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])
val listOfActions = onlytxns.reverse zip (LazyList from ) //zipWithIndex
listOfActions.foreach { case (txn,idx) =>
txn.asInstanceOf[Action] match {
case Voided(_) =>
case ti@_ =>
curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)
if(voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) {
curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr=cshr)
log.step(s"POSReader: voided txnitem: $curTxnItem")
}
val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)
vchState = vch.header
vchItems = vch.txnItems
log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")
}
}
log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")
vchItems.txnitems
}

重演List[Event],产生了List[TxnItem]。

向数据库里写List[TxnItem]:

 def writeTxnsToDB(vchnum: Int, txntype: Int, bseq: Long, eseq: Long, txns: List[TxnItem])(
implicit system: akka.actor.ActorSystem, session: CassandraSession, pid: PID): Future[Seq[TxnItem]] = ???

注意返回结果类型Future[Seq[TxnItem]]。我们用for-yield把这几个动作串起来:

  val txnitems: Future[List[Events.TxnItem]] = for {
lst1 <- readActions //read list from Source
lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions
else FastFuture.successful(lst1)
items <- FastFuture.successful( buildVoucher(lstTxns) )
_ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
_ <- session.close(ec)
} yield items

注意返回结果类型Future[Seq[TxnItem]]。我们用for-yield把这几个动作串起来:

  val txnitems: Future[List[Events.TxnItem]] = for {
lst1 <- readActions //read list from Source
lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions
else FastFuture.successful(lst1)
items <- FastFuture.successful( buildVoucher(lstTxns) )
_ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
_ <- session.close(ec)
} yield items

注意:这个for返回的Future[List[TxnItem]],是提供给restapi输出功能的。在那里List[TxnItem]会被转换成json作为post的包嵌数据。

现在所有子任务的返回结果类型都是Future了。我们可以再用for来把它们串起来:

             val futReadSaveNExport = for {
txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)
_ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend,
{ if(txntype == Events.TXNTYPE.voidall)
txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))
else txnitems },
trace)(ctx.system.toClassic, pid)
} yield ()

说到EventSourcedBehavior,因为用了cassandra-plugin,忽然想起配置文件里新旧有很大区别。现在这个application.conf是这样的:

akka {
loglevel = INFO
actor {
provider = cluster
serialization-bindings {
"com.datatech.pos.cloud.CborSerializable" = jackson-cbor
}
}
remote {
artery {
canonical.hostname = "192.168.11.189"
canonical.port =
}
}
cluster {
seed-nodes = [
"akka://cloud-pos-server@192.168.11.189:2551"]
sharding {
passivate-idle-entity-after = m
}
}
# use Cassandra to store both snapshots and the events of the persistent actors
persistence {
journal.plugin = "akka.persistence.cassandra.journal"
snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
}
}
akka.persistence.cassandra {
# don't use autocreate in production
journal.keyspace = "poc2g"
journal.keyspace-autocreate = on
journal.tables-autocreate = on
snapshot.keyspace = "poc2g_snapshot"
snapshot.keyspace-autocreate = on
snapshot.tables-autocreate = on
} datastax-java-driver {
basic.contact-points = ["192.168.11.189:9042"]
basic.load-balancing-policy.local-datacenter = "datacenter1"
}

akka.persitence.cassandra段落里可以定义keyspace名称,这样新旧版本应用可以共用一个cassandra,同时在线。

最新文章

  1. STM32F412应用开发笔记之二:基本GPIO控制
  2. C#并行
  3. Android获取短信验证码
  4. maven工程模块化
  5. TweenMax学习一
  6. JAVA求集合中的组合
  7. 多线程Server client
  8. POJ1386Play on Words[有向图欧拉路]
  9. 第八章 企业项目开发--分布式缓存memcached
  10. ASP.Net MVC如何访问的静态页面
  11. Java的多线程+Socket 后台 Ver 2.0
  12. Qemu+gdb跟踪内核源码
  13. Codeforces Round #276 (Div. 2)
  14. android 工具类之图片加工
  15. Content-Type
  16. Chromium on Android: Android在系统Chromium为了实现主消息循环分析
  17. iOS基础 - 多媒体
  18. 关于props和state以及redux中的state
  19. oracle_一次移动数据库dbf文件的操作
  20. 线上服务器上安装的VNCServer不能正常工作

热门文章

  1. 【Android】SDK的配置
  2. 【Storm】编程模型
  3. Mybatis 的动态SQL,批量增删查改
  4. Java实现 LeetCode 775 全局倒置与局部倒置(分析题)
  5. Java实现 蓝桥杯VIP 算法训练 接水问题
  6. Java实现 LeetCode 234 回文链表
  7. Java实现 蓝桥杯VIP 算法训练 比较字符串
  8. SQL手工注入绕过过滤
  9. 全网最全测试点总结:N95 口罩应该如何测试?
  10. mysql导入超大sql文件