原同步服务正常,因需,对方单表新增字段,超过22条

sbt assembly 编译出错

too many elements for tuple: 26, allowed: 22

scala case class 最多只支持22个构造参数

遂找解决办法

https://underscore.io/blog/posts/2016/10/11/twenty-two.html

https://github.com/slick/slick/issues/519#issuecomment-48327043

https://github.com/underscoreio/slickless/issues/16

最终应用slickless解决

部分示例

import slick.jdbc.H2Profile.api._
import shapeless.{ HList, ::, HNil, Generic }
import slickless._ case class User(id: Long, email: String) class Users(tag: Tag) extends Table[User](tag, "users") {
def id = column[Long]( "id", O.PrimaryKey, O.AutoInc )
def email = column[String]("email") def * = (id :: email :: HNil).mappedWith(Generic[User])
} lazy val users = TableQuery[Users]

 

基于 tupled 需要作转换

    case (BYOFFSET_GET,lastid:Int) => {
println("sync start by",lastid)
val words = TableQuery[Words]
// val action=ORIGIN_DB.run(Word.filter(_.id > lastid).take(PAGE_SIZE).result)
val action=ORIGIN_DB.run(words.filter(_.id > lastid).take(PAGE_SIZE).result)
action.onComplete(data=>{
println("sync get data",data.get.length)
if (data.isSuccess){
val words=data.get.toList.map(a=>{
Word.tupled(a)
})
if (words.length>0){
Future {
println(s"Blocking next page 1s start")
TimeUnit.SECONDS.sleep(1)
println(s"Blocking next page 1s finished")
//同步时只考虑insert
if(is_just_insert){
self !(BYOFFSET_INSERT,words)
}else{
//如果会更新历史数据
self !(BYOFFSET_INSERT_UPDATE,words)
}
}
}else{
//拿到最新数据 等待5分钟
Future {
println(s"Blocking future 5m start")
TimeUnit.MINUTES.sleep(5)
println(s"Blocking future 5m finished")
self !(BYOFFSET_GET,lastid)
}
}
}else{
      Future {
       println(s"Blocking table "+tablename+" future 5m start")
       TimeUnit.MINUTES.sleep(5)
       println(s"Blocking table "+tablename+" future 5m finished")
       self !(BYOFFSET_GET,lastid)
      }
}
      })
}
//插入数据
case (BYOFFSET_INSERT,words:List[Word])=>{
println("insert start",words.length)
val word = TableQuery[Words]
word.++=(words.map(a=>{
Word.unapply(a).get
}))
//Word.+=(Word.unapply(words.head).get)
val insertActions = DBIO.seq(
word.++=(words.map(a=>{
Word.unapply(a).get
}))
)
DEST_DB.run(insertActions).onComplete(data=>{
if (data.isSuccess){
println("insert data result",data)
//添加成功后更新last表
val lastid=words.last.id
Sync.lastActor !(BYOFFSET_UPSERT_OFFSET,tablename,lastid)
}else{
self !(BYOFFSET_INSERT,words)
}
})
}

 

基于HLists/Generics 则不必

  case (BYOFFSET_GET,lastid:Int) => {
println("table "+tablename+" sync start by",lastid)
val users = TableQuery[Users]
// val action=ORIGIN_DB.run(User.filter(_.id > lastid).take(PAGE_SIZE).result)
val action=ORIGIN_DB.run(users.filter(_.id > lastid).take(PAGE_SIZE).result)
action.onComplete(data=>{
println("table "+tablename+" sync get data",data.get.length)
if (data.isSuccess){
val users=data.get.toList
if (users.length>0){
Future {
println(s"Blocking table "+tablename+" next page 1s start")
TimeUnit.SECONDS.sleep(1)
println(s"Blocking table "+tablename+" next page 1s finished")
//同步时只考虑insert
if(is_just_insert){
self !(BYOFFSET_INSERT,users)
}else{
//如果会更新历史数据
self !(BYOFFSET_INSERT_UPDATE,users)
}
}
}else{
Future {
println(s"Blocking table "+tablename+" future 5m start")
TimeUnit.MINUTES.sleep(5)
println(s"Blocking table "+tablename+" future 5m finished")
self !(BYOFFSET_GET,lastid)
}
}
}else{
Future {
println(s"Blocking table "+tablename+" future 5m start")
TimeUnit.MINUTES.sleep(5)
println(s"Blocking table "+tablename+" future 5m finished")
self !(BYOFFSET_GET,lastid)
}
}
})
}
// 插入数据
case (BYOFFSET_INSERT,users:List[User])=>{
println("table "+tablename+" insert start",users.length)
val user = TableQuery[Users]
//User.+=(User.unapply(users.head).get)
val insertActions = DBIO.seq(
user++=users
)
DEST_DB.run(insertActions).onComplete(data=>{
if (data.isSuccess){
println("table "+tablename+" insert data result",data)
//添加成功后更新last表
val lastid=users.last.id
Sync.lastActor !(BYOFFSET_UPSERT_OFFSET,tablename,lastid)
}else{
self !(BYOFFSET_INSERT,users)
}
})
}  

scala slick 异构表同步服务

https://github.com/cclient/ScalaMysqlSync

最新文章

  1. java用selenium库控制chrome
  2. LeetCode 【347. Top K Frequent Elements】
  3. SQL超过锁请求
  4. python 代码片段26
  5. pods的安装和使用
  6. C#关于MSMQ通过HTTP远程发送专有队列消息的问题
  7. Asp.net开启分布式事务管理
  8. 经典算法系列--kmp
  9. RH033读书笔记(5)-Lab 6 Exploring the Bash Shell
  10. .net图片快速去底(去除白色背景)
  11. [置顶] github 出现 Permission denied (publickey)的解决
  12. phpStudy The requested URL /web/index.php was not found on this server
  13. 我的C#跨平台之旅(四):使用AOP(filter、attribute)进行系统增强
  14. SpringBoot系列: Json的序列化和反序列化
  15. mysql表备份的一种方式
  16. 基于TLS的EAP 认证方法
  17. Python中让MySQL查询结果返回字典类型的方法
  18. Swift中的Any 与 AnyObject、AnyClass的区别?
  19. AVD Android模拟器系统
  20. Nodejs文件监控chokidar

热门文章

  1. No module named cv2 报错处理
  2. 十六进制转化二进制[c]
  3. Shell程序实例集锦一
  4. [C/C++]C/C++计算代码的运行时间
  5. CSS中的 position与Grid Layout
  6. 201412-2 Z字形扫描 Java
  7. Evaluation metrics for classification
  8. python中的变量对象小结2
  9. 给Office文档添加水印效果【测试有效】
  10. 一、Cookie和Session介绍