一个topology的启动包括了三个步骤

1)创建TopologyBuilder,设置输入源,输出源

2)获取config

3)提交topology(这里不考虑LocalCluster本地模式)

以storm.starter的ExclamationTopology为例:

public static void main(String[] args)throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word", new TestWordSpout(), 10);

builder.setBolt("exclaim1", new ExclamationBolt(), 3)

.shuffleGrouping("word");

builder.setBolt("exclaim2", new ExclamationBolt(), 2)

.shuffleGrouping("exclaim1");

Config conf = new Config();

conf.setDebug(true);

if(args!=null && args.length > 0) {

conf.setNumWorkers(3);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} else {

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf,builder.createTopology());

Utils.sleep(10000);

cluster.killTopology("test");

cluster.shutdown();

}

}

TopologyBuilder

TopologyBuilder builder = newTopologyBuilder();

builder.setSpout("word", new TestWordSpout(), 10);

builder.setBolt("exclaim1", new ExclamationBolt(), 3)

.shuffleGrouping("word");

builder.setBolt("exclaim2", new ExclamationBolt(), 2)

.shuffleGrouping("exclaim1");

每个topology都是先创建了一个TopologyBuilder对象,之后才可以进行设置相关属性。

先来看看setSpout函数:

public SpoutDeclarer setSpout(String id, IRichSpout spout, Numberparallelism_hint) {

validateUnusedId(id);

initCommon(id, spout, parallelism_hint);

_spouts.put(id, spout);

return new SpoutGetter(id);

}

这个函数的处理的流程是,先检查下componentId  是否已经被使用过。要是被使用过则抛出IllegalArgumentException 异常。

然后开始初始化ComponentCommon,这个结构是storm.thrift中定义的(thrift会生成对应的java类),定义如下:

struct ComponentCommon {

1: required map<GlobalStreamId, Grouping> inputs;

2: required map<string, StreamInfo> streams; //key is stream id

3: optional i32 parallelism_hint; //how many threads across the clustershould be dedicated to this component

// component specific configuration

4: optional string json_conf;

}

初始化ComponentCommon对象,最后会记录到TopologyBuilder 的成员变量Map<String, ComponentCommon> _commons。其中key是componentId,在这里就是"word"。

然后在TopologyBuilder 的成员变量Map<String, IRichSpout> _spouts,记录下spout的记录。其中key也是componentId,在这里是"word"。

builder.setBolt("exclaim1",new ExclamationBolt(), 3)这部分和setSpout基本相似,只是最后记bolt的时候,是记录在TopologyBuilder 的成员变量Map<String, IRichBolt> _bolts,其中key是componentId,在这里就是"exclaim1"。之后,.shuffleGrouping("word")这部分,是调用setBolt返回的,BoltDeclarer中的shuffleGrouping。

最终将会调用到grouping,其中streamId在这里没有指定,会使用"default"来替代。

public BoltDeclarer shuffleGrouping(StringcomponentId) {

return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);

}

public BoltDeclarer shuffleGrouping(StringcomponentId, String streamId) {

return grouping(componentId, streamId, Grouping.shuffle(newNullStruct()));

}

在这里grouping最后一个参数是生成了Grouping对象,并填充shuffle为NullStruct,其中Grouping是在storm.thrift定义的一个联合体,thrift会生成对应的java代码,内部定义了很多种grouping的方式。

private BoltDeclarer grouping(StringcomponentId, String streamId, Grouping grouping)

{

_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId,streamId), grouping);

return this;

}

grouing函数是将之前记录在_commons中的,bolt的componentId对应的ComponentCommon的键值对,取出来设置ComponentCommon中的inputs的值。以第一个setBolt为例,就是取出"exclaim1"这个componentId对应的ComponentCommon,将里面的inputs设置为,这个输入是从哪里来的,也就是"word"这个componentId,streamId为"default"的这个spout流作为第一个bolt的输入源。

获取Config

Config比较简单,内部定义了很多key的记录,并且这个Config是从Map派生过来的,因此Config就是一个Map,在之后插入记录,就插入自身当中就可以。

在这个例子中有两个set函数的调用。

conf.setDebug(true);就是在Map中插入一条记录("topology.debug" -> "true"),用来标记是打开debug模式的。

conf.setNumWorkers(3);同样在Map中插入一条记录("topology.workers" -> 3),用来增加一条配置项,标记worker数为3个。

提交Topology

在看看最后一部分,如何生成topology和提交的:

StormSubmitter.submitTopology(args[0],conf, builder.createTopology())。

生成StormTopology

builder.createTopology()会利用从之前设置的TopologyBuilder对象去生成一个topology。

public StormTopology createTopology() {

Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();

Map<String, SpoutSpec> spoutSpecs = new HashMap<String,SpoutSpec>();

for(String boltId: _bolts.keySet()) {

IRichBolt bolt = _bolts.get(boltId);

ComponentCommon common = getComponentCommon(boltId, bolt);

boltSpecs.put(boltId,\

newBolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));

}

for(String spoutId: _spouts.keySet()) {

IRichSpout spout = _spouts.get(spoutId);

ComponentCommon common = getComponentCommon(spoutId, spout);

spoutSpecs.put(spoutId, \

newSpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));

}

return new StormTopology(spoutSpecs,

boltSpecs,

newHashMap<String, StateSpoutSpec>());

}

创建一个topology的流程是:

1)首先,所有bolt取出来(setBolt设置),每个对应一个Bolt插入到BoltSpec。具体来说是将所有在setBolt阶段记录的componentId->IRichBolt记录取出来,从_commons记录的componentId->ComponentCommon中查找到对应的ComponentCommon,通过深拷贝生成一个新的ComponentCommon并设置相关值并返回。插入一条关于bolt的记录到boltSpecs中,其中key是在之前setBolt传入的componentId,value是一个Bolt对象。注意,这个Bolt不是setBolt时传递进来的那个Bolt(传递进来的是一个IRichBolt),而是storm.thrift中定义的Bolt,在Bolt内部有会有两个值,这两个值的类型分别是ComponentObject和ComponentCommon,其中这个ComponentCommon返回的那个。

boltSpecs可以理解为内部有很多条componentId->(序列化后的bolt+ComponentCommon)。

2)然后生成所有的spout对应SpoutSpec的记录插入到spoutSpecs。具体过程基本一样,只是spoutSpecs中的value是storm.thrift中定义的SpoutSpec。但从结构来看Bolt和SpoutSpec内部都是有两个成员变量,分别是 ComponentObject bolt_object和ComponentCommoncommon。

3)最后是生成一个StormTopology,并返回。StormTopology也是storm.thrift中定义的一个struct结构,定义如下:

struct StormTopology {

//ids must be unique across maps

//#workers to use is in conf

1:required map<string, SpoutSpec> spouts;

2:required map<string, Bolt> bolts;

3:required map<string, StateSpoutSpec> state_spouts;

}

因此StormTopology对象的构造函数只是简单的将前面生成的boltSpecs,spoutSpecs以及一个新的_stateSpouts对象,赋给StormTopology内部对应的变量。

通过上述步骤就生成了一个StormTopology。

另外还需要单独将getComponentCommon拿出来说一说,上面说的只是一个大概含义。

先看看这个函数

private ComponentCommongetComponentCommon(String id, IComponent component) {

ComponentCommon ret = new ComponentCommon(_commons.get(id));

OutputFieldsGetter getter = new OutputFieldsGetter();

component.declareOutputFields(getter);

ret.set_streams(getter.getFieldsDeclaration());

return ret;

}

第一句就是之前说的,从_commons记录中取一个componentId对应的ComponentCommon,并赋给新的ComponentCommon,这里的赋值是通过deepcopy完成的。

后面的,就和用户编写topology有关系了。OutputFieldsGetter是实现了OutputFieldsDeclarer接口,内部有declare接口供编写bolt或者spout时调用,用来申明stream信息,这些信息都会记录在内部的Map<String, StreamInfo> _field,如果没有在申明是指定streamId,那么streamId都会采用默认的"default"作为streamId。

bolt和spout最终都会实现IComponent的declareOutputFields接口。在这个接口内部会申明下这个流有什么字段之类的信息。以ExclamationTopology例子中的ExclamationBolt为例,在declareOutputFields内部会去申明这个流有一个字段叫做word,declareOutputFields的参数就是上面说的OutputFieldsGetter实例。

public static class ExclamationBolt extendsBaseRichBolt {

...

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

再取到OutputFieldsGetter中记录stream对应信息的_fields,赋值给ComponentCommon中的变量stream。这样,ComponentCommon的记录信息就完整了(bolt会记录inputs信息,而spout不会记录)。

submitTopology

提交到集群中,会调用StormSubmitter中的静态方法:

public static void submitTopology(Stringname, Map stormConf, StormTopology topology) throws AlreadyAliveException,InvalidTopologyException {

submitTopology(name, stormConf, topology, null);

}

参数name表示要运行的topology的名字。分布式模式时,name就是jar包中的入口类的名字,本地模式时name可以任意一个可以用来表示topology的名字。

内部将会调用真正提交topology的函数。

public static void submitTopology(Stringname, Map stormConf, StormTopology topology, SubmitOptions opts) throwsAlreadyAliveException, InvalidTopologyException {

if(!Utils.isValidConf(stormConf)) {

throw new IllegalArgumentException("Storm conf is not valid. Mustbe                                           json-serializable");

}

stormConf = new HashMap(stormConf);

stormConf.putAll(Utils.readCommandLineOpts());

Map conf = Utils.readStormConfig();

conf.putAll(stormConf);

try {

String serConf = JSONValue.toJSONString(stormConf);

if(localNimbus!=null) {

LOG.info("Submittingtopology " + name + " in local mode");

localNimbus.submitTopology(name, null, serConf, topology);

} else {

NimbusClient client =NimbusClient.getConfiguredClient(conf);

if(topologyNameExists(conf,name)) {

throw newRuntimeException("Topology with name `" + name + "`                                                    alreadyexists on cluster");

}

submitJar(conf);

try {

LOG.info("Submitting topology" +  name + " in distributedmode with                                            conf" + serConf);

if(opts!=null) {

client.getClient().submitTopologyWithOpts(name, submittedJar,                                                       serConf,topology, opts);

} else {

// this is forbackwards compatibility

client.getClient().submitTopology(name, submittedJar, serConf,                                                        topology);

}

}catch(InvalidTopologyException e) {

LOG.warn("Topologysubmission exception", e);

throw e;

} catch(AlreadyAliveExceptione) {

LOG.warn("Topologyalready alive exception", e);

throw e;

} finally {

client.close();

}

}

LOG.info("Finished submitting topology: " +  name);

} catch(TException e) {

throw new RuntimeException(e);

}

}

正在去做提交topoloy这个动作的流程是:

1)校验传进来的配置合法性,并读取default.yaml和storm.yaml。这个过程在分析nimbus启读取配置时有分析过,这里就不再累赘了。

2)获取Config.NIMBUS_HOST和Config.NIMBUS_THRIFT_PORT值,创建NimbusClient。在内部是封装了访问Nimbus这个rpc server(基于thrift)的rpc client,在NimbusClient构造时,就创建了rpcclient并建立与rpc server的连接。

3)检测现在准备提交的topology是否和集群中正在运行的topology名字有冲突。检测过程很简单,通过rpc调用从Nimbus服务器上取下集群的信息。集群的信息里面会记录supervisor的统计信息,nimbus更新时间,topology的统计信息。在topology的统计信息中会记录topology的名字。因此只要遍历比较下就能知道名字是否会冲突。

在storm.thrift中定义的集群信息结构如下:

struct ClusterSummary {

1:required list<SupervisorSummary> supervisors;

2:required i32 nimbus_uptime_secs;

3:required list<TopologySummary> topologies;

}

4) submitJar 上传jar文件。使用bin/stormjar xxx.jar xxx.class arg1 args2运行topology的时候,

bin/storm这个python脚本中,最后会使用"java-Dstorm.jar=xxx.jar ..."命名运行jar中指定的xxx.class中的main函数。submitJar首先会从环境变量中读取到storm.jar这个变量的值,如果没有设置过,则抛出异常。否则,会创建连接到Nimbus的rpc client,调用beginFileUpload通知要上传文件,Nimbus会返回一个上传的路径,之后分段读取jar文件,调用uploadChunk上传到nimbus所告知的那个路径,jar文件数据都上传完毕调用finishFileUpload告知nimbus,对那个路径的文件已上传完毕。其中beginFileUpload,uploadChunk,finishFileUpload都是storm.thrift定义的service Nimbus中的方法,其中Nimbus.Iface是在 Nimbus.clj被实现了。

5)成功上传jar文件后,会用第二步中创建的rpcclient调用Nimbus上的submitTopology方法,这个方法也是在storm.thrft中service Nimbus。调用这个Nimbus上的这个方法可以理解为通知Nimbus去运行这个topology。通知的时候,会带上name,这个name就是jar的入口类的名字。

这样经过上述5步后,一个编写有topology任务的jar文件就上传提交到nimbus,并可以由nimbus将任务分发给supervisors去执行这个topology。

最新文章

  1. Problems about trees
  2. 用Laravel+Grunt+Bower管理你的应用
  3. Android部分调试开关
  4. color the python console text
  5. c# 模拟表单提交,post form 上传文件、大数据内容
  6. MFC可编辑的ListCtrl
  7. Android应用程序开发之图片操作(一)——Bitmap,surfaceview,imageview,Canvas
  8. INFORMATION_SCHEMA.COLUMNS 查询表字段语句
  9. 阅读UML类图和时序图
  10. Unity 3D Framework Designing(8)——使用ServiceLocator实现对象的注入
  11. 201521123025《java程序设计》第12周学习总结
  12. ASP.NET Core 2.1与2.2 SignalR CORS 跨域问题
  13. PDF 补丁丁 0.6.0.3355 版发布(修复阅读模式、书签缩放的问题)
  14. 我靠,上班eclipse看糗事百科
  15. 外购半成品报SHORT问题(验货客户)
  16. JavaScript判断浏览器及其版本信息
  17. jenkins中管理用户
  18. 摘:ClickOnce部署
  19. linux安装ant
  20. 转:windows 下 netsh 实现 端口映射(端口转发)

热门文章

  1. 【QTP专题】02_时间同步点问题
  2. kvm虚拟机静态迁移
  3. MySQL 5.7.16 在CentOS 6.5 x64 安装
  4. [Flex] 修改注释中的@author方法
  5. jquery源码解析:jQuery延迟对象Deferred(工具方法)详解2
  6. Python环境安装与升级
  7. web 应用的部署
  8. 数据库表的约束constraints
  9. Python爬虫常用之登录(三) 使用http请求登录
  10. EntityFramework First,FirstOrDefault,Single,SingleOrDefault的区别