topology在服务端提交过程中,会经过一系列的验证和初始化:TP结构校验、创建本地文件夹并拷贝序列化文件jar包、生成znode用于存放TP和task等信息,最后一步才进行任务分配。例如以下图:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbGlobTBfMQ==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />
提交主函数位于ServiceHandler.java中

private void makeAssignment(String topologyName, String topologyId,
TopologyInitialStatus status) throws FailedAssignTopologyException {
//1、创建topology的分配事件
TopologyAssignEvent assignEvent = new TopologyAssignEvent();
assignEvent.setTopologyId(topologyId);
assignEvent.setScratch(false);
assignEvent.setTopologyName(topologyName);
assignEvent.setOldStatus(Thrift
.topologyInitialStatusToStormStatus(status));
//2、丢入事件处理队列
TopologyAssign.push(assignEvent);
//3、等待时间返回
boolean isSuccess = assignEvent.waitFinish();
if (isSuccess == true) {
LOG.info("Finish submit for " + topologyName);
} else {
throw new FailedAssignTopologyException(
assignEvent.getErrorMsg());
}
}

这当中最基本的是事件丢入队列后兴许的处理过程。事件分配由TopologyAssign线程处理,这个线程的流程非常清晰,监听事件队列。一旦有事件进入,立即取出,进行doTopologyAssignment,例如以下:

public void run() {
LOG.info("TopologyAssign thread has been started");
runFlag = true; while (runFlag) {
TopologyAssignEvent event;
try {
event = queue.take();
} catch (InterruptedException e1) {
continue;
}
if (event == null) {
continue;
} boolean isSuccess = doTopologyAssignment(event); ..............
}

任务分配的核心代码位于TopologyAssign.java中

public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
String topologyId = event.getTopologyId(); LOG.info("Determining assignment for " + topologyId); TopologyAssignContext context = prepareTopologyAssign(event); Set<ResourceWorkerSlot> assignments = null; if (!StormConfig.local_mode(nimbusData.getConf())) { IToplogyScheduler scheduler = schedulers
.get(DEFAULT_SCHEDULER_NAME);
//開始进行作业的调度
assignments = scheduler.assignTasks(context); } else {
assignments = mkLocalAssignment(context);
}
............
}

调用栈例如以下:

分配原理是首先获得全部可用的supervisor,推断supervisor可用的标准是是否有空暇的slot,也就是是否全部supervisor.slots.ports指定port都被占用,然后计算出须要分配几个woker。由于一个woker相应一个port,当然这些信息的採集都是来自Zookeeper,如今我们来分析分配的核心代码:
WorkerMaker.java
//注意參数,result是这个作业须要的槽位。传入前仅仅知道须要槽位的数量,详细分配到哪台supervisor上还没指定
//supervisors指当前集群中全部可用的supervisor。即有空暇port的

private void putWorkerToSupervisor(List<ResourceWorkerSlot> result,
List<SupervisorInfo> supervisors) {
int key = 0;
//按所需槽位遍历,每次分配一个
for (ResourceWorkerSlot worker : result) {
//首先进行必要的推断和置位
if (supervisors.size() == 0)
return;
if (worker.getNodeId() != null)
continue;
if (key >= supervisors.size())
key = 0;
//1、取出第一个supervisor
SupervisorInfo supervisor = supervisors.get(key);
worker.setHostname(supervisor.getHostName());
worker.setNodeId(supervisor.getSupervisorId());
worker.setPort(supervisor.getWorkerPorts().iterator().next());
//槽位用完则从集合中删除,不再參与分配
supervisor.getWorkerPorts().remove(worker.getPort());
if (supervisor.getWorkerPorts().size() == 0)
supervisors.remove(supervisor);
//当一个supervisor分配完后便不再使用。除非supervisor不够用
key++;
}
}

从上面的代码中我们能够看到,眼下槽位分配没考虑机器负载,槽位的分配并不一定平均,比方第一个supervisor有10个槽位,剩下的supervisor仅仅有两个,那么还是要每一个supervisor分配一个woker的。

注意一个问题,在上面代码中supervisors这个集合是经过排序的,排序规则例如以下:

private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> result,
List<SupervisorInfo> supervisors) {
...........
supervisors = this.getCanUseSupervisors(supervisors);
Collections.sort(supervisors, new Comparator<SupervisorInfo>() { @Override
public int compare(SupervisorInfo o1, SupervisorInfo o2) {
// TODO Auto-generated method stub
return -NumberUtils.compare(o1.getWorkerPorts().size(), o2
.getWorkerPorts().size());
} });
this.putWorkerToSupervisor(result, supervisors);
.............
}

能够看到。当前排序规则是按slot多少的,我们兴许版本号中可能会考虑机器负载的一些因素吧。

最新文章

  1. MySQL备忘
  2. tagfield
  3. Ajax与用户交互的存储格式JSON
  4. eclipse使用jetty插件出现内存溢出解决方案
  5. 【Linux】gdb调试core文件
  6. iosblock用法
  7. 【Hadoop学习】CDH5.2安装部署
  8. 一段网上java常见escape和unescape方法的BUG
  9. Destoon标签使用技巧十则
  10. perl访问数组中变量
  11. win server2012 r2 服务器共享文件夹设置
  12. json小结和fastjson包的实际json操作
  13. motan负载均衡/zookeeper集群/zookeeper负载均衡的关系
  14. ConcurrentLinkedQueue简介
  15. 六、web应用与Tomcat
  16. Redis不支持ssl
  17. 血红蛋白值的临床意义(hemoglobin ,Hb,HGB)
  18. js中的变量作用域问题
  19. uva-10282-枚举
  20. TZOJ 2725 See you~(二维树状数组单点更新区间查询)

热门文章

  1. mybatis使用generator自己主动生成代码时的类型转换
  2. [实例]ROS使用OpenCV读取图像并发布图像消息在rviz中显示
  3. 数据库 The Network Adapter could not establish the connection解决方案
  4. 基础apache命令
  5. Spark SQL 编程API入门系列之SparkSQL的依赖
  6. C#---爬虫抓取系列
  7. 自定义TempData跨平台思路
  8. Web前端必须规避的8个误区
  9. 设置cookie,删除cookie,读取cookie
  10. 通过obs进行推流