前面的文章都是基于local模式分析的,现在我们简要分析一下在remote模式下,ActorSystem的创建过程。

final val ProviderClass: String =
setup.get[BootstrapSetup]
.flatMap(_.actorRefProvider).map(_.identifier)
.getOrElse(getString("akka.actor.provider")) match {
case "local" ⇒ classOf[LocalActorRefProvider].getName
// these two cannot be referenced by class as they may not be on the classpath
case "remote" ⇒ "akka.remote.RemoteActorRefProvider"
case "cluster" ⇒ "akka.cluster.ClusterActorRefProvider"
case fqcn ⇒ fqcn
}

  之前我们分析过,在创建provider过程中,是通过ProviderClass来判断具体是哪种模式的。从ProviderClass源码来看,当我们配置akka.actor.provider为remote时,会创建akka.remote.RemoteActorRefProvider的实例。我们知道ActorSystem在start时候会去调用provider.init方法进行初始化。

def init(system: ActorSystemImpl): Unit = {
local.init(system) actorRefResolveThreadLocalCache = ActorRefResolveThreadLocalCache(system) remotingTerminator = system.systemActorOf(
remoteSettings.configureDispatcher(Props(classOf[RemotingTerminator], local.systemGuardian)),
"remoting-terminator") val internals = Internals(
remoteDaemon = {
val d = new RemoteSystemDaemon(
system,
local.rootPath / "remote",
rootGuardian,
remotingTerminator,
_log,
untrustedMode = remoteSettings.UntrustedMode)
local.registerExtraNames(Map(("remote", d)))
d
},
transport =
if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Transport match {
case ArterySettings.AeronUpd ⇒ new ArteryAeronUdpTransport(system, this)
case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = false)
case ArterySettings.TlsTcp ⇒ new ArteryTcpTransport(system, this, tlsEnabled = true)
}
else new Remoting(system, this)) _internals = internals
remotingTerminator ! internals _log = Logging.withMarker(eventStream, getClass.getName) // this enables reception of remote requests
transport.start() _remoteWatcher = createRemoteWatcher(system)
remoteDeploymentWatcher = createRemoteDeploymentWatcher(system)
}

  我们来结合RemoteActorRefProvider的构造函数和init函数来初步理解RemoteActorRefProvider的行为。首先在init方法的第一步就是调用local的init,通过local的类型我们发现这是一个LocalActorRefProvider,local的作用暂时不做分析,继续往下看。

  下面创建了ActorRefResolveThreadLocalCache对象,从ActorRefResolveThreadLocalCache的定义来看(这里就不再贴出相关代码),它是一个ThreadLocal变量,且是一个实现了Lru的缓存器,缓存的内容是ActorRef,具体作用也忽略。remotingTerminator的具体作用也不做深入分析。

private final case class Internals(transport: RemoteTransport, remoteDaemon: InternalActorRef)
extends NoSerializationVerificationNeeded

  Internals的定义还是值得一看的,它有两个变量,其中transport的值应该是new Remoting(system, this),remoteDaemon的值是RemoteSystemDaemon。然后调用了transport.start(),也就是Remoting的start。那么Remoting具体又是什呢?

// Start assumes that it cannot be followed by another start() without having a shutdown() first
override def start(): Unit = {
endpointManager match {
case None ⇒
log.info("Starting remoting")
val manager: ActorRef = system.systemActorOf(
configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local),
Remoting.EndpointManagerName)
endpointManager = Some(manager) try {
val addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]] = Promise()
manager ! Listen(addressesPromise) val transports: Seq[(AkkaProtocolTransport, Address)] = Await.result(
addressesPromise.future,
StartupTimeout.duration)
if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null) transportMapping = transports.groupBy {
case (transport, _) ⇒ transport.schemeIdentifier
} map { case (k, v) ⇒ k → v.toSet } defaultAddress = transports.head._2
addresses = transports.map { _._2 }.toSet log.info("Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]")) manager ! StartupFinished
eventPublisher.notifyListeners(RemotingListenEvent(addresses)) } catch {
case e: TimeoutException ⇒
notifyError("Startup timed out. This is usually related to actor system host setting or host name resolution misconfiguration.", e)
throw e
case NonFatal(e) ⇒
notifyError("Startup failed", e)
throw e
} case Some(_) ⇒
log.warning("Remoting was already started. Ignoring start attempt.")
}
}

  在Remoting.start过程中,首先创建了EndpointManager,然后发送了一条Listen消息,并使用Await.result等待它的返回,然后又给EndpointManager发送了StartUpFinished。上面代码中的log.info("Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]"))还是值得我们关注的,毕竟我们启动remote模式的ActorSystem会经常看到这个日志信息。我们来看看EndpointManager收到Listen消息后做了哪些操作。

  那么listens又是什么呢?

private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
/*
* Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
* like the following:
* AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
*
* The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances).
*/
val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) ← settings.Transports) yield { val args = Seq(classOf[ExtendedActorSystem] → context.system, classOf[Config] → config) // Loads the driver -- the bottom element of the chain.
// The chain at this point:
// Driver
val driver = extendedSystem.dynamicAccess
.createInstanceFor[Transport](fqn, args).recover({ case exception ⇒ throw new IllegalArgumentException(
s"Cannot instantiate transport [$fqn]. " +
"Make sure it extends [akka.remote.transport.Transport] and has constructor with " +
"[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters", exception) }).get // Iteratively decorates the bottom level driver with a list of adapters.
// The chain at this point:
// Adapter <- ... <- Adapter <- Driver
val wrappedTransport =
adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider }.foldLeft(driver) {
(t: Transport, provider: TransportAdapterProvider) ⇒
// The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])
} // Apply AkkaProtocolTransport wrapper to the end of the chain
// The chain at this point:
// AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)
} // Collect all transports, listen addresses and listener promises in one future
Future.sequence(transports.map { transport ⇒
transport.listen map { case (address, listenerPromise) ⇒ (transport, address, listenerPromise) }
})
}

  很明显这是一个transports集合,每个transports应该是一个AkkaProtocolTransport对象,AkkaProtocolTransport创建完成之后,调用了listen方法,最终返回AkkaProtocolTransport的列表。其实分析到这里我们可以不必再继续深入AkkaProtocolTransport的具体功能,从上面的官方注释以及我们的猜测来看,这大概是在初始化网络相关的对象。比如它可以是一个socket或者netty封装后的socket,是用来listen某个端口号,接收和发送数据的。

  当然RemoteActorRefProvider.init的最后两行分别创建了RemoteWatcher、RemoteDeploymentWatcher,这两个Actor的作用后面再具体分析。

  至此,remote模式下的初始化基本就算结束了,其实就是用RemoteActorRefProvider替换了LocalActorRefProvider,并完成了provider相关的初始化。remote模式与local模式下,ActorSystem初始化过程区别并不大,这还得多谢Akka框架封装的好。下一篇博客我们会分析actor的创建过程,毕竟在remote模式下,actor的创建过程还是有点不同的。

最新文章

  1. Oracle学习笔记五 SQL命令(三):Group by、排序、连接查询、子查询、分页
  2. Oracle启动脚本,开机自启动设置
  3. H5-杂七杂八的标签
  4. css3 forwards、backwards、both
  5. MVC 5.0(or5.0↓) Ajax.BeginForm 异步上传附件问题,答案是不能的!
  6. 2014年互联网IT待遇(包括国内民企、外企、金融机构)
  7. 转 常用JQuery插件整理
  8. 鼠标经过图片时变换的两种方法--css+div及javascript应用
  9. [LeetCode] K-th Symbol in Grammar 语法中的第K个符号
  10. 十分钟带你读懂《增长黑客》zz
  11. 网络编程懒人入门(六):深入浅出,全面理解HTTP协议
  12. 【thinkPHP框架】Failed opening required &#39;header.php&#39; include_path=&#39;.;c:\php5\pear 终级解决方案
  13. Django进阶知识
  14. Nginx详解二:Nginx基础篇之Nginx的优点
  15. sklearn 的train_test_split
  16. .NET 黑魔法 - asp.net core 配置文件的&quot;对象存储&quot;
  17. sql注入(一)
  18. Windows上安装tensorflow 详细教程
  19. If TransactionScope will close database connections
  20. 114. Flatten Binary Tree to Linked List -- 将二叉树转成链表(in-place单枝树)

热门文章

  1. &lt;MyBatis&gt;入门七 缓存机制
  2. exception对象的使用及常用方法
  3. 微信小程序官方指南手册,教你如何使用微信小程序!
  4. 关于jupyter notebook
  5. nyoj 911 Registration system(map)
  6. iOS攻城狮修炼之路
  7. windows开启3306端口并用可视化工具访问远程mysql(授权访问)
  8. java 源码分析2 -List
  9. 混合图(dizzy.pas/cpp/c)
  10. gh-ost: triggerless online schema migrations:Blog by Shlomi Noach: