Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt、bolt和bolt之间的关系。它能够被提交到Jstorm集群。

本文以Jstorm自带的SequenceTopology简介一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及详细业务,

1、 SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.SetBuilder(TopologyBuilder builder, Map conf)。该方法主要依据配置文件,使用TopologyBuilder构造Topology的spout和bolt。以及spout和bolt之间的关系

2、TopologyBuilder构造好Topology之后,通过Jstorm Client的StormSubmitter.submitTopology(streamName, conf,builder.createTopology())提交Topology到Jstorm集群,

3、在StormSubmitter.submitTopology方法中。首先会对配置项进行检查、然后将Topology自己的配置项和Jstorm的配置项组装成一个大的Map。之后上传用户在命令行提交的Jar包,然后通过NimbusClient 的submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) 方法将Topology提交到Jstorm集群,其核心代码例如以下:

if (!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException(
"Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
putUserInfo(conf, stormConf);
String serConf = JSON.toJSONString(stormConf);
if (localNimbus != null) {
localNimbus.submitTopology(name, null, serConf, topology);
} else {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
if (topologyNameExists(conf, name)) {//检查名字是否反复,Jstorm要求每一个topology名称必须唯一
throw new RuntimeException("Topology with name `" + name
+ "` already exists on cluster");
}
submitJar(conf);//上传Jar包到ZK
client.getClient().submitTopologyWithOpts(name, path,
serConf, topology, opts);//通过Thrift将topology提交到集群

4、NimbusClient提交之后,NimbusSever通过com.alibaba.jstorm.daemon.nimbus.ServiceHandler.submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options)处理接收到的topology。其详细逻辑例如以下(代码已经精简)

public void submitTopologyWithOpts(String topologyname,
String uploadedJarLocation, String jsonConf,
StormTopology topology, SubmitOptions options)
throws AlreadyAliveException, InvalidTopologyException,
TopologyAssignException, TException {
//首先检查topology是否已经存在
checkTopologyActive(data, topologyname, false);
//生成topology的唯一标识
int counter = data.getSubmittedCount().incrementAndGet();
String topologyId = topologyname + "-" + counter + "-"
+ TimeUtils.current_time_secs();
try {
//反序列化topology配置项
Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils
.from_json(jsonConf);
if (serializedConf == null) {
LOG.warn("Failed to serialized Configuration");
throw new InvalidTopologyException(
"Failed to serilaze topology configuration");
}
//将topology的名称和ID添加到配置项中
serializedConf.put(Config.TOPOLOGY_ID, topologyId);
serializedConf.put(Config.TOPOLOGY_NAME, topologyname);
Map<Object, Object> stormConf;
stormConf = NimbusUtils.normalizeConf(conf, serializedConf,
topology);
Map<Object, Object> totalStormConf = new HashMap<Object, Object>(
conf);
totalStormConf.putAll(stormConf);
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(
stormConf, topology); // this validates the structure of the topology
Common.validate_basic(normalizedTopology, totalStormConf,
topologyId);
// don't need generate real topology, so skip Common.system_topology
// Common.system_topology(totalStormConf, topology);
StormClusterState stormClusterState = data.getStormClusterState();
// 创建topology在ZK上的文件夹
setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
normalizedTopology); // 为每个spout或者bolt生成Task,并在ZK上创建对应的task文件夹<span style="font-family: Arial, Helvetica, sans-serif;">/ZK/tasks/topoologyId/xxx</span>
setupZkTaskInfo(conf, topologyId, stormClusterState);
// 进行任务分配
TopologyAssignEvent assignEvent = new TopologyAssignEvent();
assignEvent.setTopologyId(topologyId);
assignEvent.setScratch(false);
assignEvent.setTopologyName(topologyname);
assignEvent.setOldStatus(Thrift
.topologyInitialStatusToStormStatus(options
.get_initial_status())); TopologyAssign.push(assignEvent);
LOG.info("Submit for " + topologyname + " with conf "
+ serializedConf); boolean isSuccess = assignEvent.waitFinish();
if (isSuccess == true) {
LOG.info("Finish submit for " + topologyname);
}

最新文章

  1. 只写104行代码!在nopCommerce中如何实现自动生成网站地图
  2. HTML&amp;CSS----练习隐藏导航栏(三级导航)
  3. 织梦cms PHPcms 帝国cms比较
  4. 既约分数-phi
  5. html+CSS--水平居中设置(定宽块状元素)
  6. HttpServletRequestWrapper的使用
  7. C# 调用AForge类库操作摄像头
  8. WPF:如何实现单实例的应用程序(Single Instance)
  9. C++之类和对象——C++ primer plus学习(一)
  10. WebService推送数据,数据结构应该怎样定义?
  11. springCloud项目练习
  12. HttpClient4 TIME_WAIT和CLOSE_WAIT
  13. 解决:在微信中访问app下载链接提示“已停止访问该网页”
  14. Debian 无线网络切换问题解决方案
  15. Joone
  16. android 开发设计模式---Strategy模式
  17. 分布式系统-主键唯一id,订单编号生成-雪花算法-SnowFlake
  18. 基于WebSocket 私聊、ws_session、httpsession
  19. [.NET] 一个获取随机数的新方式
  20. poj 1873

热门文章

  1. python下载文件(图片)源码,包含爬网内容(爬url),可保存cookie
  2. Matlab中S函数建立与应用
  3. 网络授时服务 NTP
  4. Integral Promotions整数提升和符号扩展(char,unsigned char,signed char)
  5. Ext的异步请求(二级级联动态加载下拉列表)
  6. 四大流行的jdbc连接池之C3P0篇
  7. 在uboot里面加入环境变量使用run来运行
  8. Oracle查看表空间使用情况
  9. 新书:《Liferay Portal 6.1最佳实践门户网站建设》
  10. Google - Pagerank