akka 集群 Sharding分片

分片上下级结构

集群(多台节点机) —> 每台节点机(1个片区) —> 每个片区(多个分片) —> 每个分片(多个实体)

实体: 分片管理的 Actor
Shards :分片是统一管理的一组实体
ShardRegion : 片区,部署在每个集群节点上,对分片进行管理
ShardCoordinator : cluster-singleton 集群单例, 决定分片属于哪个片区

工作原理

ShardRegion 在节点上启动

带实体ID的消息--> 片区ShardRegion ,请求分片位置-->ShardCoordinator-->决定哪个ShardRegion将拥有Shard-->

ShardRegion 确认请求并创建 Shard supervisor 做为子actor -->shard actor 创建 entity -->ShardRegion和Shard 定位entity

分区的分片规则

所有片区组成分布式分片管理层,带实体ID的消息直接发给本机片区,分片管理层路由消息, ShardRegion创建需要提供基于ShardRegion.MessageExtractor的实现 ,必须提供从消息抽取分片和实体ID的函数

示例实现

package shard

import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.japi.Option
import akka.japi.pf.ReceiveBuilder
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.io.Serializable
import java.time.Clock.system
import java.time.Clock.system
import java.util.*
import java.time.Clock.system /**
* Created by: tankx
* Date: 2019/7/16
* Description: 集群分片示例
*/ //分布到集群环境中
class DogActor : AbstractActor() { var log = LoggerFactory.getLogger(DogActor::class.java) override fun createReceive(): Receive {
return ReceiveBuilder.create().matchAny(this::receive).build()
} fun receive(obj: Any) { log.info("收到消息: $obj")
if (obj is DogMsg) {
log.info("${obj.id} ${obj.msg}")
} } } //定义消息(必须带有实体ID进行分片)
data class DogMsg(var id: Int, var msg: String) : Serializable //分片规则
class ShardExtractor : ShardRegion.MessageExtractor {
//提取实体ID,实体对应的actor
override fun entityId(message: Any?): String {
if (message is DogMsg) {
return message.id.toString()
} else {
throw RuntimeException("无法识别消息类型 $message")
}
} //根据实体ID,计算出对应分片ID
override fun shardId(message: Any?): String {
//var numberOfShards: Int = 10 //简单的分区数取模 return message.id%numberOfShards
if (message is DogMsg) {
//return (message.id % 10).toString()
return message.id.toString()
} else {
throw RuntimeException("无法识别消息类型 $message")
}
} //对消息可进行拆封操作
override fun entityMessage(message: Any): Any {
return message
} }
//分区停止时会派发的消息类型
object handOffStopMessage fun createActorSystem(port: Int): ActorSystem {
val config = ConfigFactory.parseString(
"akka.remote.artery.canonical.port=$port"
).withFallback(
ConfigFactory.load()
) var actorSystem = ActorSystem.create("custerA", config); return actorSystem } fun startShardRegion(port: Int) { var actorSystem = createActorSystem(port) val settings = ClusterShardingSettings.create(actorSystem)//.withRole("ClusterShardRole") val shardReg = ClusterSharding.get(actorSystem).start(
"dogShard",
Props.create(DogActor::class.java),
settings,
ShardExtractor(),
ShardCoordinator.LeastShardAllocationStrategy(10, 1),
handOffStopMessage
) for (i in 1..10) { shardReg.tell(DogMsg(i, " wang"), ActorRef.noSender()) Thread.sleep(3000)
} } fun shardRegProxy() { var actorSystem = createActorSystem(2663) //startProxy 代理模式,即它不会承载任何实体本身,但知道如何将消息委托到正确的位置
ClusterSharding.get(actorSystem)
.startProxy("dogShard", Optional.empty(), ShardExtractor())
.let { println(" shard proxy $it started.") } Thread.sleep(3000) var shardReg = ClusterSharding.get(actorSystem).shardRegion("dogShard") for (i in 1..100) { shardReg.tell(DogMsg(i, "C wang"), ActorRef.noSender()) Thread.sleep(1500)
}
}

再分别启动入口

fun main() {
startShardRegion(2661)
}
fun main() {
startShardRegion(2662)
}
fun main() {

    shardRegProxy()

}

配置文件:

akka {
actor {
provider = "cluster"
} # For the sample, just bind to loopback and do not allow access from the network
# the port is overridden by the logic in main class
remote.artery {
enabled = on
transport = tcp
canonical.port = 0
canonical.hostname = 127.0.0.1
} cluster {
seed-nodes = [
"akka://custerA@127.0.0.1:2661",
"akka://custerA@127.0.0.1:2662"] # auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
auto-down-unreachable-after = 10s
}
}

注意

以上示例需在同属一个集群,消息才能正常中转,所以一定要保障 ActorSystem.create(name,config) name 为一致!(调了半天消息一直没有发送成功,原来是这里的问题,KAO!)

同一个集群同一个ActorSystem name!

不然会报异常:

No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed

最新文章

  1. 解决低版本chrome浏览器不支持es6 Array.find()
  2. Android中log使用方法
  3. 对于程序开发者看书(指实在的书而不是PDF)的好处。(个人看法而已)
  4. nginx配置多个网址
  5. Angular2 从0到1 (一)
  6. http2.0
  7. 折腾一天,终于配置好了,ssl证书,启用了https,用的阿里云ECS服务器
  8. junit源码解析--测试驱动运行阶段
  9. 使用python读取word,写入execl
  10. [Swift]LeetCode768. 最多能完成排序的块 II | Max Chunks To Make Sorted II
  11. JSoup抓取本地页面
  12. SimpleDateFormat线程不安全
  13. The Two Routes CodeForces - 601A(水最短路)
  14. Tomcat6.0下的jsp、servlet和javabean的配置
  15. python & dict & switch
  16. Ubuntu16.04中安装stlink驱动
  17. iOS - UITableView滚动到指定的cell并且选中
  18. Linux内核设计第五周学习总结 分析system_call中断处理过程
  19. git删除本地分支和删除远程分支
  20. 电脑连接真机,但是androidstudio不显示手机,ADB Interface黄色感叹号

热门文章

  1. 使用NEWSEQUENTIALID解决GUID聚集索引问题
  2. windows和linux双系统,重新分区后修复grub
  3. gcc与vs2013的三个charset编译选项
  4. 【canvas】blackboard 黑板
  5. Linux正则和grep命令
  6. 浅谈js闭包(closure)
  7. asp.net mvc PC端二维码支付实例(微信二维码支付)
  8. 前端自动化工具gulp入门基础
  9. 09 Javascript的伪数组 arguments
  10. mpvue 试水的一天