[源码解析] Flink 的slot究竟是什么?(2)

0x00 摘要

Flink的Slot概念大家应该都听说过,但是可能很多朋友还不甚了解其中细节,比如具体Slot究竟代表什么?在代码中如何实现?Slot在生成执行图、调度、分配资源、部署、执行阶段分别起到什么作用?本文和上文将带领大家一起分析源码,为你揭开Slot背后的机理。

0x01 前文回顾

书接上回 [源码解析] Flink 的slot究竟是什么?(1)。前文中我们已经从系统架构和数据结构角度来分析了Slot,本文我们将从业务流程角度来分析Slot。我们重新放出系统架构图

和数据结构逻辑关系图

下面我们从几个流程入手一一分析。

0x02 注册/更新Slot

有两个途径会注册Slot/更新Slot状态。

  • 当TaskExecutor注册成功之后会和RM交互进行注册时,一并注册Slot;
  • 定时心跳时,会在心跳payload中附加Slot状态信息;

2.1 TaskExecutor注册成功

当TaskExecutor注册成功之后会和RM交互进行注册。会通过如下的代码调用路径来向ResourceManager(SlotManagerImpl)注册Slot。SlotManagerImpl 在获取消息之后,会更新Slot状态,如果此时已经有如果有pendingSlotRequest,就直接分配,否则就更新freeSlots变量。

  • TaskExecutor#establishResourceManagerConnection;

  • TaskSlotTableImpl#createSlotReport;建立 report

    • 这时候的 report如下:

      slotReport = {SlotReport@9633} 
      
        0 = {SlotStatus@8969} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
      slotID = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0"
      resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
      allocationID = null
      jobID = null 1 = {SlotStatus@9638} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
      slotID = {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1"
      resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
      allocationID = null
      jobID = null
  • ResourceManager#sendSlotReport;通过RPC(resourceManagerGateway.sendSlotReport)调用到RM

  • SlotManagerImpl#registerTaskManager;把TaskManager注册到SlotManager

  • SlotManagerImpl#registerSlot;

  • SlotManagerImpl#createAndRegisterTaskManagerSlot;生成注册了TaskManagerSlot

    • 这时候代码 & 变量如下,我们可以看到,就是把TM的Slot信息注册到SlotManager中

      private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
      final TaskManagerSlot slot = new TaskManagerSlot(
      slotId, resourceProfile, taskManagerConnection);
      slots.put(slotId, slot);
      return slot;
      } slot = {TaskManagerSlot@13322}
      slotId = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0"
      resourceProfile = {ResourceProfile@4194}
      cpuCores = {CPUResource@11616} "Resource(CPU: 89884656743115785...0)"
      taskHeapMemory = {MemorySize@11617} "4611686018427387903 bytes"
      taskOffHeapMemory = {MemorySize@11618} "4611686018427387903 bytes"
      managedMemory = {MemorySize@11619} "64 mb"
      networkMemory = {MemorySize@11620} "32 mb"
      extendedResources = {HashMap@11621} size = 0
      taskManagerConnection = {WorkerRegistration@11121}
      allocationId = null
      jobId = null
      assignedSlotRequest = null
      state = {TaskManagerSlot$State@13328} "FREE"
  • SlotManagerImpl#updateSlot

  • SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接分配

  • SlotManagerImpl#handleFreeSlot;否则就更新freeSlots变量

流程结束后,SlotManager如下,可以看到此时slots个数是两个,freeSlots也是两个,说明都是空闲的:

this = {SlotManagerImpl@11120}
scheduledExecutor = {ActorSystemScheduledExecutorAdapter@11125}
slotRequestTimeout = {Time@11127} "300000 ms"
taskManagerTimeout = {Time@11128} "30000 ms"
slots = {HashMap@11122} size = 2
{SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206}
{SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322}
freeSlots = {LinkedHashMap@11129} size = 2
{SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322}
{SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206}
taskManagerRegistrations = {HashMap@11130} size = 1
fulfilledSlotRequests = {HashMap@11131} size = 0
pendingSlotRequests = {HashMap@11132} size = 0
pendingSlots = {HashMap@11133} size = 0
slotMatchingStrategy = {AnyMatchingSlotMatchingStrategy@11134} "INSTANCE"
slotRequestTimeoutCheck = {ActorSystemScheduledExecutorAdapter$ScheduledFutureTask@11139}

2.2 心跳机制更新Slot状态

Flink的心跳机制也会被利用来进行Slots信息的汇报,Slot Report被包括在心跳payload中。

首先在 TE 中建立Slot Report

  • TaskExecutor#heartbeatFromResourceManager
  • HeartbeatManagerImpl#requestHeartbeat
  • TaskExecutor$ResourceManagerHeartbeatListener # retrievePayload
  • TaskSlotTableImpl # createSlotReport

程序运行到 RM,于是 SlotManagerImpl 调用到 reportSlotStatus,进行Slot状态更新。

  • ResourceManager#heartbeatFromTaskManager

  • HeartbeatManagerImpl#receiveHeartbeat

  • ResourceManager$TaskManagerHeartbeatListener#reportPayload

  • SlotManagerImpl#reportSlotStatus,此时的SlotReport如下:

    • slotReport = {SlotReport@8718}
      slotsStatus = {ArrayList@8717} size = 2
      0 = {SlotStatus@9025} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
      slotID = {SlotID@9032} "d99e16d7-a30c-4e21-b270-f82884b1813f_0"
      resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
      allocationID = null
      jobID = null
      1 = {SlotStatus@9026} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
      slotID = {SlotID@9029} "d99e16d7-a30c-4e21-b270-f82884b1813f_1"
      resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
      allocationID = null
      jobID = null
  • SlotManagerImpl#updateSlot

  • SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接分配

  • SlotManagerImpl#handleFreeSlot;否则就更新freeSlots变量

    • freeSlots.put(freeSlot.getSlotId(), freeSlot);

0x03 生成ExecutionGraph阶段

当Job提交之后,经过一系列处理,Scheduler会建立ExecutionGraph。ExecutionGraph 是 JobGraph 的并行版本。而通过一系列的分析,才可以最终把任务分发到相关的任务槽中。槽会根据CPU的数量提前指定出来,这样可以最大限度的利用CPU的计算资源。如果Slot耗尽,也就意味着新分发的作业任务是无法执行的。

ExecutionGraphJobManager根据JobGraph生成的分布式执行图,是调度层最核心的数据结构。

一个JobVertex / ExecutionJobVertex代表的是一个operator,而具体的ExecutionVertex则代表了一个Task。

在生成StreamGraph时候,StreamGraph.addOperator方法就已经确定了operator是什么类型,比如OneInputStreamTask,或者SourceStreamTask等。

假设OneInputStreamTask.class即为生成的StreamNode的vertexClass。这个值会一直传递,当StreamGraph被转化成JobGraph的时候,这个值会被传递到JobVertex的invokableClass。然后当JobGraph被转成ExecutionGraph的时候,这个值被传入到ExecutionJobVertex.TaskInformation.invokableClassName中,最后一直传到Task中。

本系列代码执行序列如下:

  • JobMaster#createScheduler

  • DefaultSchedulerFactory#createInstance

  • DefaultScheduler#init

  • SchedulerBase#init

  • SchedulerBase#createAndRestoreExecutionGraph

  • SchedulerBase#createExecutionGraph

  • ExecutionGraphBuilder#buildGraph

  • ExecutionGraph#attachJobGraph

  • ExecutionJobVertex#init,这里根据并行度来确定要建立多少个Task,即多少个ExecutionVertex。

    • int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
      this.taskVertices = new ExecutionVertex[numTaskVertices];
  • ExecutionVertex#init,这里会生成Execution。

    • this.currentExecution = new Execution(
      getExecutionGraph().getFutureExecutor(),
      this, 0, initialGlobalModVersion, createTimestamp, timeout);

0x04 调度阶段

任务的流程就是通过作业分发到TaskManager,然后再分发到指定的Slot进行执行。

这部分调度阶段的代码只是利用CompletableFuture把程序执行架构搭建起来,可以把认为是自顶之下进行操作

Job开始调度之后,代码执行序列如下:

  • JobMaster#startJobExecution

  • JobMaster#resetAndStartScheduler

  • Future操作

  • JobMaster#startScheduling

  • SchedulerBase#startScheduling

  • DefaultScheduler#startSchedulingInternal

  • LazyFromSourcesSchedulingStrategy#startScheduling,这里开始针对Vertices进行资源分配和部署

    • allocateSlotsAndDeployExecutionVertices(schedulingTopology.getVertices());
  • LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices,这里会遍历ExecutionVertex,筛选出Create状态的 & 输入Ready的节点。

    • private void allocateSlotsAndDeployExecutionVertices(
      final Iterable<? extends SchedulingExecutionVertex<?, ?>> vertices) {
      // 取出状态是CREATED,且输入Ready的 ExecutionVertex
      final Set<ExecutionVertexID> verticesToDeploy = IterableUtils.toStream(vertices)
      .filter(IS_IN_CREATED_EXECUTION_STATE.and(isInputConstraintSatisfied()))
      .map(SchedulingExecutionVertex::getId)
      .collect(Collectors.toSet());
      // 根据 ExecutionVertex 建立 DeploymentOption
      final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions = ...;
      // 分配资源并且部署
      schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
      }
  • DefaultScheduler#allocateSlotsAndDeploy

这里来到了本文第一个关键函数 allocateSlotsAndDeploy。其主要功能是:

  1. allocateSlots分配Slot,其实这时候并没有分配,而是建立一系列Future,然后根据Future返回SlotExecutionVertexAssignment列表。
  2. 根据SlotExecutionVertexAssignment建立DeploymentHandle
  3. 根据deploymentHandles进行部署,其实是根据Future把部署搭建起来,具体如何部署需要在slot分配成功之后再执行。
@Override
public void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
validateDeploymentOptions(executionVertexDeploymentOptions); final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions); final List<ExecutionVertexID> verticesToDeploy = executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()); final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
executionVertexVersioner.recordVertexModifications(verticesToDeploy); transitionToScheduled(verticesToDeploy); // 分配Slot,其实这时候并没有分配,而是建立一系列Future,然后根据Future返回SlotExecutionVertexAssignment列表
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
allocateSlots(executionVertexDeploymentOptions); // 根据SlotExecutionVertexAssignment建立DeploymentHandle
final List<DeploymentHandle> deploymentHandles = createDeploymentHandles(
requiredVersionByVertex,
deploymentOptionsByVertex,
slotExecutionVertexAssignments); // 根据deploymentHandles进行部署,其实是根据Future把部署搭建起来,具体如何部署需要在slot分配成功之后再执行
if (isDeployIndividually()) {
deployIndividually(deploymentHandles);
} else {
waitForAllSlotsAndDeploy(deploymentHandles);
}
}

接下来 两个小章节我们分别针对 allocateSlots 和 deployIndividually / waitForAllSlotsAndDeploy 进行分析。

0x05 分配资源阶段

注意,此处的入口为 allocateSlotsAndDeploy 的allocateSlots 调用

在分配slot时,首先会在JobMaster中SlotPool中进行分配,具体是先SlotPool中获取所有slot,然后尝试选择一个最合适的slot进行分配,这里的选择有两种策略,即按照位置优先和按照之前已分配的slot优先;若从SlotPool无法分配,则通过RPC请求向ResourceManager请求slot,若此时并未连接上ResourceManager,则会将请求缓存起来,待连接上ResourceManager后再申请。

5.1 CompletableFuture

CompletableFuture 首先是一个 Future,它拥有 Future 所有的功能,包括取得异步执行结果,取消正在执行的任务等,其次是 一个CompleteStage,其最大作用是将回调改为链式调用,从而将 Future 组合起来。

此处生成了执行框架,即通过三个 CompletableFuture 构成了执行框架

我们按照出现顺序命名为 Future 1,Future 2,Future 3。

但是这个反过来说明反而更方便。我们可以看到,'

出现次序是 Future 1,Future 2,Future 3

调用顺序是 Future 3 ---> Future 2 ---> Future 1

5.1.1 Future 3

我们可以称之为 PhysicalSlot Future

类型是:CompletableFuture

生成在:requestNewAllocatedSlot 函数中对 PendingRequest 的生成。PendingRequest 的构造函数中有 new CompletableFuture<>(),这个 Future 3 是 PendingRequest 的成员变量。

用处是:

  • PendingRequest 会 加入到 waitingForResourceManager

回调函数作用是:

  • 在 allocateMultiTaskSlot 的 whenComplete 会把payload赋值给slot,allocatedSlot.tryAssignPayload
  • 进一步回调在 createRootSlot 函数 的 forward . thenApply 语句,会 设置为 Future 3 回调 Future 2 的回调函数

何时回调

  • TM,TE offer Slot的时候,会根据 PendingRequest 间接回调到这里

6.1.2 Future 2

我们可以称之为 allocationFuture

类型是:

  • CompletableFuture ,CompletableFuture 有类型转换

生成在:

  • createRootSlot函数中。final CompletableFuture slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();

用处是:

  • 把 Future 2 设置为 multiTaskSlot 的成员变量 private final CompletableFuture<? extends SlotContext> slotContextFuture;
  • Future 2 其实也就是 SingleTaskSlot 的 parent.getSlotContextFuture(),因为 multiTaskSlot 和 SingleTaskSlot 是父子关系
  • 在 SingleTaskSlot 构造函数 中,Future 2 会赋值给 SingleTaskSlot 的成员变量 singleLogicalSlotFuture。
  • 即 Future 2 实际上是 SingleTaskSlot 的成员变量 singleLogicalSlotFuture
  • SchedulerImpl # allocateSharedSlot 函数,return leaf.getLogicalSlotFuture(); 会被返回 singleLogicalSlotFuture 给外层调用,就是外层看到的 allocationFuture。

回调函数作用是:

  • 在 SingleTaskSlot 构造函数 中,会生成一个 SingleLogicalSlot(未来回调时候会真正生成 )
  • 在 internalAllocateSlot 函数中,会回调 Future 1,allocationResultFuture的回调函数

何时回调

  • 被 Future 3 的回调函数调用

6.1.3 Future 1

我们可以称之为 allocationResultFuture

类型是:

  • CompletableFuture

生成在:

  • SchedulerImpl#allocateSlotInternal,这里生成了第一个 CompletableFuture

用处是:

  • 后续 Deploy 时候会用到 这个 Future 1,会通过 handle 给 Future 1 再加上两个后续调用,是在 Future 1 结束之后的后续调用。

回调函数作用是:

  • allocateSlotsFor 函数中有错误处理
  • 后续 Deploy 时候会用到 这个 Future 1,会通过 handle 给 Future 1 再加上两个后续调用,是在 Future 1 结束之后的后续调用。

何时回调

  • 语句在internalAllocateSlot中,但是在 Future 2 回调函数中调用

5.2 流程图

这里比较复杂,先给出流程图

 *  Run in Job Manager
*
* DefaultScheduler#allocateSlotsAndDeploy
* |
* +----> DefaultScheduler#allocateSlots
* | //把ExecutionVertex转化为ExecutionVertexSchedulingRequirements
* |
* +----> DefaultExecutionSlotAllocator#allocateSlotsFor( 调用 1 开始 )
* | // 得到 我们的第一个 CompletableFuture,我们称之为 Future 1
* |
* |
* +--------------> NormalSlotProviderStrategy#allocateSlot
* |
* |
* +--------------> SchedulerImpl#allocateSlotInternal
* | // 生成了第一个 CompletableFuture,以后称之为 allocationResultFuture
* |
* ┌────────────┐
* │ Future 1 │ 生成 allocationResultFuture
* └────────────┘
* │
* │
* +----> SchedulerImpl#internalAllocateSlot( 调用 2 开始 )
* | // Future 1 做为参数被传进来,这里会继续调用,生成 Future 2, Future 3
* |
* |
* +-----------> SchedulerImpl#allocateSharedSlot( 调用 3 开始 )
* | // 这里涉及到 MultiTaskSlot 和 SingleTaskSlot
* |
* +-----------> SchedulerImpl # allocateMultiTaskSlot ( 调用 4 开始 )
* |
* |
* +--------------------> SchedulerImpl # requestNewAllocatedSlot
* |
* |
* +--------------------> SlotPoolImpl#requestNewAllocatedSlot
* | // 这里生成一个 PendingRequest
* | // PendingRequest的构造函数中有 new CompletableFuture<>(),
* | // 所以这里是生成了第三个 Future,注意这里的 Future 是针对 PhysicalSlot
* |
* |
* ┌────────────┐
* │ Future 3 │ 生成 Future<PhysicalSlot>,这个 Future 3 实际是对用户不可见的。
* └────────────┘
* |
* |
* +-----------> SchedulerImpl # allocateMultiTaskSlot( 调用 4 结束 )
* | // 回到 ( 调用 4 ) 这里,得倒 Future 3
* | // 这里得倒了第三个 Future<PhysicalSlot>
* | // 第三是因为从用户角度看,它是第三个出现的
* |
* +-----------------------> slotSharingManager # createRootSlot
* | // 把 Future 3 做为参数传进去
* | // 这里马上生成 Future 2
* | // Future 2 被设置为 multiTaskSlot 的成员变量 slotContextFuture;
* | // 然后forward . thenApply 语句 会 设置为 Future 3 回调 Future 2 的回调函数
* |
* |
* +-----------> SchedulerImpl#allocateSharedSlot
* | // 回到 ( 调用 3 ) 这里
* |
* |
* +-----------------------> SlotSharingManager#allocateSingleTaskSlo
* | // 在 rootMultiTaskSlot 之上生成一个 SingleTaskSlot leaf加入到allTaskSlots。
* | // leaf.getLogicalSlotFuture(); 这个就是Future 2,设置好的
* |
* |
* +-----------> SchedulerImpl#allocateSharedSlot
* | // 还在 ( 调用 3 ) 这里
* | // return leaf.getLogicalSlotFuture(); 返回 Future 2
* |
* |
* ┌────────────┐
* │ Future 2 │
* └────────────┘
* |
* |
* |
* +----> SchedulerImpl#internalAllocateSlot
* | // 回到 ( 调用 2 ) 这里
* | // 设置,在 Future 2 的回调函数中会调用 Future 1
* |
* |
* +----> DefaultExecutionSlotAllocator#allocateSlotsFor
* | // 回到 ( 调用 1 ) 这里
* |
* |
* |
* ┌────────────┐
* │ Future 1 │
* └────────────┘
* |
* |
* +----> createDeploymentHandles
* | // 生成 DeploymentHandle
* |
* |
* +-----------> deployIndividually(deploymentHandles);
* | // 这里会给 Future 1 再加上两个 回调函数,作为 部署回调
* |

下图是为了手机阅读。

5.3 具体执行路径

默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许slot共享有以下两点好处:

  • Flink 集群所需的task slots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。
  • 更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将基线的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。

此处执行路径大致如下:

  • DefaultScheduler#allocateSlotsAndDeploy

  • DefaultScheduler#allocateSlots;该过程会把ExecutionVertex转化为ExecutionVertexSchedulingRequirements,会封装包含一些location信息、sharing信息、资源信息等

  • DefaultExecutionSlotAllocator#allocateSlotsFor;我们小节实际是从这里开始分析,这里会进行一系列操作,一层层调用下去。首先这个函数会得到我们的第一个 CompletableFuture,我们称之为 allocationResultFuture,这个名字的由来后续就会知道。这个 slotFuture 会赋值给 SlotExecutionVertexAssignment,然后传递给外面。后续 Deploy 时候会用到 这个 slotFuture,会通过 handle 给 slotFuture 再加上两个后续调用,是在slotFuture结束之后的后续调用。

    • public List<SlotExecutionVertexAssignment> allocateSlotsFor(...) {
      for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) { // 得到第一个 CompletableFuture,具体是在 calculatePreferredLocations 中通过
      CompletableFuture<LogicalSlot> slotFuture =
      calculatePreferredLocations(...).thenCompose(...) ->
      slotProviderStrategy.allocateSlot( // 函数里面生成了第一个CompletableFuture
      slotRequestId,
      new ScheduledUnit(...),
      SlotProfile.priorAllocation(...))); SlotExecutionVertexAssignment slotExecutionVertexAssignment =
      new SlotExecutionVertexAssignment(executionVertexId, slotFuture); slotFuture.whenComplete(
      (ignored, throwable) -> { // 第一个CompletableFuture的回调函数,里其实只是异常处理,后续有人会调用到这里
      pendingSlotAssignments.remove(executionVertexId);
      if (throwable != null) {
      slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
      }
      }); slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
      } return slotExecutionVertexAssignments;
      }
  • NormalSlotProviderStrategy#allocateSlot(slotProviderStrategy.allocateSlot)

  • SchedulerImpl#allocateSlotInternal,这里生成了第一个 CompletableFuture,我们可以称之为 allocationResultFuture

    • private CompletableFuture<LogicalSlot> allocateSlotInternal(...) {
      // 这里生成了第一个 CompletableFuture,我们以后称之为 allocationResultFuture
      final CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<>();
      // allocationResultFuture 会传送进去继续处理
      internalAllocateSlot(allocationResultFuture, slotRequestId, scheduledUnit,
      slotProfile, allocationTimeout);
      // 返回 allocationResultFuture
      return allocationResultFuture;
      }
  • SchedulerImpl#allocateSlot

  • SchedulerImpl#internalAllocateSlot,该方法会根据vertex是否共享slot来分配singleSlot/SharedSlot。这里得到第二个 CompletableFuture,我们以后成为 allocationFuture

    • private void internalAllocateSlot(
      CompletableFuture<LogicalSlot> allocationResultFuture, ...) {
      // 这里得到第二个 CompletableFuture,我们以后称为 allocationFuture,注意目前只是得到,不是生成。
      CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
      allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) :
      allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
      // 第二个Future,allocationFuture的回调函数。注意,CompletableFuture可以连续调用多个whenComplete。
      allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
      if (failure != null) { // 异常处理
      cancelSlotRequest(...);
      allocationResultFuture.completeExceptionally(failure);
      } else {
      allocationResultFuture.complete(slot); // 它将回调第一个 allocationResultFuture的回调函数
      }
      });
      }
  • SchedulerImpl#allocateSharedSlot,这里也比较复杂,涉及到 MultiTaskSlot 和 SingleTaskSlot

    • private CompletableFuture<LogicalSlot> allocateSharedSlot(...) {
      // allocate slot with slot sharing
      final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
      scheduledUnit.getSlotSharingGroupId(),
      id -> new SlotSharingManager(id,slotPool,this)); // 生成 SlotSharingManager final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; if (scheduledUnit.getCoLocationConstraint() != null) {
      multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(...);
      } else {
      multiTaskSlotLocality = allocateMultiTaskSlot(...); // 这里生成 MultiTaskSlot
      } // 这里生成 SingleTaskSlot
      final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(...); return leaf.getLogicalSlotFuture(); // 返回 SingleTaskSlot 的 future,就是第二个Future,具体生成我们在下面会详述
      }
  • SchedulerImpl # allocateMultiTaskSlot,这里是一个难点函数。因为这里生成了第三个 Future ,这里把第三个 Future 提前说明第三是因为从用户角度看,它是第三个出现的

    • private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(...) {
      
       		SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
      
       		if (multiTaskSlot == null) {
      // requestNewAllocatedSlot 会调用 SlotPoolImpl 的同名函数
      // 得到第 三 个 Future,注意,这个 Future 针对的是 PhysicalSlot
      final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(...); // 使用 第 三 个 Future 来构建 multiTaskSlot
      multiTaskSlot = slotSharingManager.createRootSlot(...,slotAllocationFuture,...); // 第 三 个 Future的回调函数,这里会把payload赋值给slot
      slotAllocationFuture.whenComplete(
      (PhysicalSlot allocatedSlot, Throwable throwable) -> {
      final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId); if (taskSlot != null) {
      // 会把payload赋值给slot
      if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {...}
      }
      });
      } return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
      }
  • SchedulerImpl # requestNewAllocatedSlot 会调用 SlotPoolImpl 的同名函数

  • SlotPoolImpl#requestNewAllocatedSlot,这里生成一个 PendingRequest

    • public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(...) {
      
       		// 生成 PendingRequest
      final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile); // 添加 PendingRequest 到 waitingForResourceManager,然后返回Future
      return requestNewAllocatedSlotInternal(pendingRequest)
      .thenApply((Function.identity()));
      }
    • PendingRequest的构造函数中有 new CompletableFuture<>(),所以这里是生成了第三个 Future,注意这里的 Future 是针对 PhysicalSlot

    • requestNewAllocatedSlotInternal

      • private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {
        
           if (resourceManagerGateway == null) {
        // 就是把 pendingRequest 加到 waitingForResourceManager 之中
        stashRequestWaitingForResourceManager(pendingRequest);
        } else {
        requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        return pendingRequest.getAllocatedSlotFuture(); // 第三个Future
        }
  • SlotSharingManager#createRootSlot,这里才是生成 第二个 Future 的地方

    • MultiTaskSlot createRootSlot(
      SlotRequestId slotRequestId,
      CompletableFuture<? extends SlotContext> slotContextFuture, // 参数是第三个Future
      SlotRequestId allocatedSlotRequestId) { // 生成第二个Future<SlotContext>
      final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>(); final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(...
      slotContextFutureAfterRootSlotResolution); // 第二个Future 在 createAndRegisterRootSlot 函数中 被赋值为 MultiTaskSlot的 slotContextFuture 成员变量 FutureUtils.forward(
      slotContextFuture.thenApply( // 第三个Future进一步回调时候,会回调第二个Future
      (SlotContext slotContext) -> {
      // add the root node to the set of resolved root nodes once the SlotContext future has
      // been completed and we know the slot's TaskManagerLocation
      tryMarkSlotAsResolved(slotRequestId, slotContext);
      return slotContext;
      }),
      slotContextFutureAfterRootSlotResolution); // 在这里回调第二个Future return rootMultiTaskSlot;
      }
  • SlotSharingManager#allocateSingleTaskSlot,这里的目的是在 rootMultiTaskSlot 之上生成一个 SingleTaskSlot leaf加入到allTaskSlots。

    • SingleTaskSlot allocateSingleTaskSlot(
      SlotRequestId slotRequestId, ResourceProfile resourceProfile,
      AbstractID groupId, Locality locality) { final SingleTaskSlot leaf = new SingleTaskSlot(
      slotRequestId, resourceProfile, groupId, this, locality); children.put(groupId, leaf); // register the newly allocated slot also at the SlotSharingManager
      allTaskSlots.put(slotRequestId, leaf); reserveResource(resourceProfile); return leaf;
      }
  • 最后回到 SchedulerImpl # allocateSharedSlot 函数,return leaf.getLogicalSlotFuture(); 这里也是一个难点,即 getLogicalSlotFuture 返回的是一个 CompletableFuture(就是第二个 Future),但是这个 SingleLogicalSlot 是未来回调时候才会生成。

    • public final class SingleTaskSlot extends TaskSlot {
      private final MultiTaskSlot parent;
      // future containing a LogicalSlot which is completed once the underlying SlotContext future is completed
      private final CompletableFuture<SingleLogicalSlot> singleLogicalSlotFuture; private SingleTaskSlot() {
      singleLogicalSlotFuture = parent.getSlotContextFuture()
      .thenApply(
      (SlotContext slotContext) -> {
      return new SingleLogicalSlot( // 未来回调时候才会生成
      slotRequestId,
      slotContext,
      slotSharingGroupId,
      locality,
      slotOwner);
      });
      } CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
      return singleLogicalSlotFuture.thenApply(Function.identity());
      }
      }

0x06 Deploy阶段

注意,此处的入口为 allocateSlotsAndDeploy函数中 的 deployIndividually / waitForAllSlotsAndDeploy 语句

此处执行路径大致如下:

  • DefaultScheduler#allocateSlotsAndDeploy

  • DefaultScheduler#allocateSlots;得到 SlotExecutionVertexAssignment 列表,上节已经详细介绍(该过程会ExecutionVertex转化为ExecutionVertexSchedulingRequirements,会封装包含一些location信息、sharing信息、资源信息等)

  • List deploymentHandles = createDeploymentHandles() 根据SlotExecutionVertexAssignment建立DeploymentHandle

  • DefaultScheduler#deployIndividually 根据deploymentHandles进行部署,其实是根据Future把部署搭建起来,具体如何部署需要在slot分配成功之后再执行。我们小节实际是从这里开始分析,具体代码可以看出,取出了 Future 1 进行一些列操作

    • private void deployIndividually(final List<DeploymentHandle> deploymentHandles) {
      for (final DeploymentHandle deploymentHandle : deploymentHandles) {
      FutureUtils.assertNoException(
      deploymentHandle
      .getSlotExecutionVertexAssignment()
      .getLogicalSlotFuture()
      .handle(assignResourceOrHandleError(deploymentHandle))
      .handle(deployOrHandleError(deploymentHandle)));
      }
      }
  • DefaultScheduler#assignResourceOrHandleError;就是返回函数,以备后续回调使用

    • private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(final DeploymentHandle deploymentHandle) {
      
         final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
      final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId(); return (logicalSlot, throwable) -> {
      if (throwable == null) {
      final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
      final boolean sendScheduleOrUpdateConsumerMessage = deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage();
      executionVertex
      .getCurrentExecutionAttempt()
      .registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage);
      executionVertex.tryAssignResource(logicalSlot);
      } else {
      handleTaskDeploymentFailure(executionVertexId, maybeWrapWithNoResourceAvailableException(throwable));
      }
      return null;
      };
      }
  • deployOrHandleError 就是返回函数,以备后续回调使用

    • private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
      
         final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
      final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId(); return (ignored, throwable) -> {
      if (throwable == null) {
      deployTaskSafe(executionVertexId);
      } else {
      handleTaskDeploymentFailure(executionVertexId, throwable);
      }
      return null;
      };
      }

0x07 RM分配资源

之前的工作基本都是在 JM 之中。通过 Scheduler 和 SlotPool 来完成申请资源和部署阶段。目前 SlotPool 之中已经积累了一个 PendingRequest,等 SlotPool 连接上 RM,就可以开始向 RM 申请资源了。

当ResourceManager收到申请slot请求时,若发现该JobManager未注册,则直接抛出异常;否则将请求转发给SlotManager处理,SlotManager中维护了集群所有空闲的slot(TaskManager会向ResourceManager上报自己的信息,在ResourceManager中由SlotManager保存Slot和TaskManager对应关系),并从其中找出符合条件的slot,然后向TaskManager发送RPC请求申请对应的slot。

代码执行路径如下:

  • JobMaster # establishResourceManagerConnection 程序执行在 JM 之中

  • SlotPoolImpl # connectToResourceManager

  • SlotPoolImpl # requestSlotFromResourceManager,这里 Pool 会向 RM 进行 RPC 请求。

    • private void requestSlotFromResourceManager(
      final ResourceManagerGateway resourceManagerGateway,
      final PendingRequest pendingRequest) {
      // 生成一个 AllocationID,这个会传到 TM 那里,注册到 TaskSlot上。
      final AllocationID allocationId = new AllocationID();
      // 生成一个SlotRequest,并且向 RM 进行 RPC 请求。
      CompletableFuture<Acknowledge> rmResponse =
      resourceManagerGateway.requestSlot(
      jobMasterId,
      new SlotRequest(jobId, allocationId,
      pendingRequest.getResourceProfile(),
      jobManagerAddress),
      rpcTimeout);
      }
  • RPC

  • ResourceManager # requestSlot 程序切换到 RM 之中

  • SlotManagerImpl # registerSlotRequest。registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException。

    • pendingSlotRequests.put
  • SlotManagerImpl # internalRequestSlot

  • SlotManagerImpl # findMatchingSlot

  • SlotManagerImpl # internalAllocateSlot,此时是没有资源的,需要向 TM 要求资源

    • private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
      final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
      OptionalConsumer.of(findMatchingSlot(resourceProfile))
      .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
      .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
      }
  • SlotManagerImpl # allocateSlot,向task manager要求资源。TaskExecutorGateway接口用来通过RPC分配任务槽,或者说分配任务的资源。

    • TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
      CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
      slotId,
      pendingSlotRequest.getJobId(),
      allocationId,
      pendingSlotRequest.getResourceProfile(),
      pendingSlotRequest.getTargetAddress(),
      resourceManagerId,
      taskManagerRequestTimeout);
  • RPC

  • TaskExecutor # requestSlot,程序切换到 TE

  • TaskSlotTableImpl # allocateSlot,分配资源,更新task slot map,把slot加入到 set of job slots 中。

    • public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId,
      ResourceProfile resourceProfile,Time slotTimeout) {
      taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId);
      taskSlots.put(index, taskSlot);
      allocatedSlots.put(allocationId, taskSlot);
      slots.add(allocationId);
      }

0x08 Offer资源阶段

此阶段是由 TE,TM 开始,就是TE 向 RM 提供 Slot,然后 RM 通知 JM 可以运行 Job。也可以认为这部分是从底向上的执行。

等待所有的slot申请完成后,然后会将ExecutionVertex对应的Execution分配给对应的Slot,即从Slot中分配对应的资源给Execution,完成分配后可开始部署作业。

这里两个关键点是:

  • 当 JM 收到 SlotOffer时候,就会根据 RPC传递过来的 taskManagerId 参数,构建一个 taskExecutorGateway,然后这个 taskExecutorGateway 被赋予为 AllocatedSlot . taskManagerGateway。这样就把 JM 范畴的 Slot 和 Slot 所在的 taskManager 联系起来
  • Execution 部署时候,是 从 SingleLogicalSlot ---> AllocatedSlot ---> TaskManagerGateway 这个顺序获取了 TaskManager 的 RPC 网关,然后通过 taskManagerGateway.submitTask 才能提交任务的。这样就把 Execution 部署阶段和执行阶段联系起来了
---------- Task Executor ----------


┌─────────────┐
│ TaskSlot │ requestSlot
└─────────────┘


┌──────────────┐
│ SlotOffer │ offerSlotsToJobManager
└──────────────┘


------------- Job Manager -------------


┌──────────────┐
│ SlotOffer │ JobMaster#offerSlots(taskManagerId,slots)
└──────────────┘
│ //taskManager = registeredTaskManagers.get(taskManagerId);
│ //taskManagerLocation = taskManager.f0;
│ //taskExecutorGateway = taskManager.f1;


┌──────────────┐
│ SlotOffer │ SlotPoolImpl#offerSlots
└──────────────┘


┌───────────────┐
│ AllocatedSlot │ SlotPoolImpl#offerSlot
└───────────────┘


┌───────────────┐
│ 回调 Future 3 │ SlotSharingManager#createRootSlot
└───────────────┘


┌───────────────┐
│ 回调 Future 2 │ SingleTaskSlot#SingleTaskSlot
└───────────────┘


┌───────────────────┐
│ SingleLogicalSlot │ new SingleLogicalSlot
└───────────────────┘


┌───────────────────┐
│ SingleLogicalSlot │
│ 回调 Future 1 │ allocationResultFuture.complete()
└───────────────────┘


┌───────────────────────────────┐
│ SingleLogicalSlot │
│回调 assignResourceOrHandleError│
└───────────────────────────────┘


┌────────────────┐
│ ExecutionVertex│ tryAssignResource
└────────────────┘


┌────────────────┐
│ Execution │ tryAssignResource
└────────────────┘


┌──────────────────┐
│ SingleLogicalSlot│ tryAssignPayload
└──────────────────┘


┌───────────────────────┐
│ SingleLogicalSlot │
│ 回调deployOrHandleError│
└───────────────────────┘


┌────────────────┐
│ ExecutionVertex│ deploy
└────────────────┘


┌────────────────┐
│ Execution │ deploy // 关键点
└────────────────┘



---------- Task Executor ----------


┌────────────────┐
│ TaskExecutor │ submitTask
└────────────────┘


┌────────────────┐
│ TaskExecutor │ startTaskThread
└────────────────┘

执行路径如下:

  • TaskExecutor # establishJobManagerConnection

  • TaskExecutor # offerSlotsToJobManager,这里就是遍历已经分配的TaskSlot,然后每个TaskSlot会生成一个SlotOffer(里面是allocationId,slotIndex,resourceProfile),这个会通过RPC发给 JM。

    • private void offerSlotsToJobManager(final JobID jobId) {
      final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
      final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId(); final Collection<SlotOffer> reservedSlots = new HashSet<>(2); while (reservedSlotsIterator.hasNext()) {
      SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
      reservedSlots.add(offer);
      }
      // 把 SlotOffer 通过RPC发给 JM
      CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture =
      jobMasterGateway.offerSlots(
      getResourceID(),
      reservedSlots,
      taskManagerConfiguration.getTimeout());
      }
  • RPC

  • JobMaster # offerSlots 。程序执行到 JM。当 JM 收到 SlotOffer时候,就会根据 RPC传递过来的 taskManagerId 参数,构建一个 taskExecutorGateway,然后这个 taskExecutorGateway 被赋予为 AllocatedSlot . taskManagerGateway。这样就把 JM 范畴的 Slot 和 Slot 所在的 taskManager 联系起来

    • public CompletableFuture<Collection<SlotOffer>> offerSlots(
      final ResourceID taskManagerId,
      final Collection<SlotOffer> slots,
      final Time timeout) { Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId); final TaskManagerLocation taskManagerLocation = taskManager.f0;
      final TaskExecutorGateway taskExecutorGateway = taskManager.f1; final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken()); return CompletableFuture.completedFuture(
      slotPool.offerSlots(
      taskManagerLocation,
      rpcTaskManagerGateway,
      slots));
      }
  • SlotPoolImpl # offerSlots

  • SlotPoolImpl # offerSlot,这里根据 SlotOffer 的信息生成一个 AllocatedSlot,对于 AllocatedSlot 来说,有效信息就是 slotIndex, resourceProfile。提醒,AllocatedSlot implements PhysicalSlot。

    • boolean offerSlot(
      final TaskManagerLocation taskManagerLocation,
      final TaskManagerGateway taskManagerGateway,
      final SlotOffer slotOffer) { // 根据 SlotOffer 的信息生成一个 AllocatedSlot,对于 AllocatedSlot 来说,有效信息就是 slotIndex, resourceProfile
      final AllocatedSlot allocatedSlot = new AllocatedSlot(
      allocationID,
      taskManagerLocation,
      slotOffer.getSlotIndex(),
      slotOffer.getResourceProfile(),
      taskManagerGateway); allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
      if (pendingRequest != null) {
      allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); // 这里取出了 pendingRequest 的 Future, 就是我们之前的 Future 3,进行回调
      if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot))
      {
      // we could not complete the pending slot future --> try to fulfill another pending request
      allocatedSlots.remove(pendingRequest.getSlotRequestId());
      tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
      }
      }
      }
  • 开始回调 Future 3,代码在 SlotSharingManager # createRootSlot 这里

    • FutureUtils.forward(
      slotContextFuture.thenApply(
      (SlotContext slotContext) -> {
      // add the root node to the set of resolved root nodes once the SlotContext future has
      // been completed and we know the slot's TaskManagerLocation
      tryMarkSlotAsResolved(slotRequestId, slotContext); // 运行到这里
      return slotContext;
      }),
      slotContextFutureAfterRootSlotResolution); // 然后到这里
  • 开始回调 Future 2,代码在 SingleTaskSlot 构造函数 ,因为有 PhysicalSlot extends SlotContext, 所以这里就把 物理Slot 映射成了一个 逻辑Slot

    • singleLogicalSlotFuture = parent.getSlotContextFuture()
      .thenApply(
      (SlotContext slotContext) -> {
      return new SingleLogicalSlot( // 回调生成了 SingleLogicalSlot
      slotRequestId,
      slotContext,
      slotSharingGroupId,
      locality,
      slotOwner);
      });
  • 开始回调 Future 1,代码在这里,调用到 后续 Deploy 时候设置的回调函数

    • allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
      if (failure != null) {
      cancelSlotRequest(
      slotRequestId,
      scheduledUnit.getSlotSharingGroupId(),
      failure);
      allocationResultFuture.completeExceptionally(failure);
      } else {
      allocationResultFuture.complete(slot); // 代码在这里
      }
      });
  • 继续回调到 Deploy 阶段设置的回调函数 assignResourceOrHandleError,就是分配资源

    • private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(final DeploymentHandle deploymentHandle) {
      
       		return (logicalSlot, throwable) -> {
      if (executionVertexVersioner.isModified(requiredVertexVersion)) { if (throwable == null) {
      final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
      final boolean sendScheduleOrUpdateConsumerMessage = deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage();
      executionVertex
      .getCurrentExecutionAttempt()
      .registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage);
      executionVertex.tryAssignResource(logicalSlot); // 运行到这里
      }
      return null;
      };
      }
    • 回调函数会深入调用 executionVertex.tryAssignResource,

    • ExecutionVertex # tryAssignResource

    • Execution # tryAssignResource

    • SingleLogicalSlot# tryAssignPayload(this),这里会把 Execution 自己 赋值给Slot.payload,最后 Execution 在 runtime 的变量举例如下:

      • payload = {Execution@10669} "Attempt #0 (CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:47) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:64)) -> Combine (SUM(1), at main(WordCount.java:67) (1/1)) @ org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@61c7928f - [SCHEDULED]"
        executor = {ScheduledThreadPoolExecutor@5928} "java.util.concurrent.ScheduledThreadPoolExecutor@6a2c6c71[Running, pool size = 3, active threads = 0, queued tasks = 1, completed tasks = 2]"
        vertex = {ExecutionVertex@10534} "CHAIN DataSource (at getDefaultTextLineDataSet(WordCountData.java:47) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:64)) -> Combine (SUM(1), at main(WordCount.java:67) (1/1)"
        attemptId = {ExecutionAttemptID@10792} "2f8b6c7297527225ee4c8036c457ba27"
        globalModVersion = 1
        stateTimestamps = {long[9]@10793}
        attemptNumber = 0
        rpcTimeout = {Time@5924} "18000000 ms"
        partitionInfos = {ArrayList@10794} size = 0
        terminalStateFuture = {CompletableFuture@10795} "java.util.concurrent.CompletableFuture@2eb8f94c[Not completed]"
        releaseFuture = {CompletableFuture@10796} "java.util.concurrent.CompletableFuture@7c794914[Not completed]"
        taskManagerLocationFuture = {CompletableFuture@10797} "java.util.concurrent.CompletableFuture@2e11ac18[Not completed]"
        state = {ExecutionState@10789} "SCHEDULED"
        assignedResource = {SingleLogicalSlot@10507}
        failureCause = null
        taskRestore = null
        assignedAllocationID = null
        accumulatorLock = {Object@10798}
        userAccumulators = null
        ioMetrics = null
        producedPartitions = {LinkedHashMap@10799} size = 1
  • 继续回调到 Deploy 阶段设置的回调函数 deployOrHandleError,就是部署

    • private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
      
         return (ignored, throwable) -> {
      if (executionVertexVersioner.isModified(requiredVertexVersion)) { if (throwable == null) {
      deployTaskSafe(executionVertexId); // 在这里部署
      } else {
      handleTaskDeploymentFailure(executionVertexId, throwable);
      }
      return null;
      };
      }
    • 回调函数深入调用其他函数

    • DefaultScheduler # deployTaskSafe

    • ExecutionVertex # deploy

    • Execution # deploy。每次调度ExecutionVertex,都会有一个Execution,在此阶段会将Execution的状态变更为DEPLOYING状态,并且为该ExecutionVertex生成对应的部署描述信息,然后从对应的slot中获取对应的TaskManagerGateway,以便向对应的TaskManager提交Task。其中,ExecutionVertex.createDeploymentDescriptor方法中,包含了从Execution Graph到真正物理执行图的转换。如将IntermediateResultPartition转化成ResultPartition,ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。

      • // 这里一个关键点是:Execution 部署时候,是 从 SingleLogicalSlot ---> AllocatedSlot ---> TaskManagerGateway 这个顺序获取了 TaskManager 的 RPC 网关,然后通过 taskManagerGateway.submitTask 才能提交任务的。这样就把 Execution 部署阶段和执行阶段联系起来了
        public void deploy() throws JobException {
        final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
        .fromExecutionVertex(vertex, attemptNumber)
        .createDeploymentDescriptor(
        slot.getAllocationId(),
        slot.getPhysicalSlotNumber(),
        taskRestore,
        producedPartitions.values()); // 这里就是关键点
        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); // 在这里通过RPC提交task给了TaskManager
        CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor).thenCompose(Function.identity())
        }
  • TaskExecutor # submitTask, 程序执行到 TE,这就是正式执行了。TaskManager(TaskExecutor)在接收到提交Task的请求后,会经过一些初始化(如从BlobServer拉取文件,反序列化作业和Task信息、LibaryCacheManager等),然后这些初始化的信息会用于生成Task(Runnable对象),然后启动该Task,其代码调用路径如下 Task#startTaskThread(启动Task线程)-> Task#run(将ExecutionVertex状态变更为RUNNING状态,此时在FLINK web前台查看顶点状态会变更为RUNNING状态,另外还会生成了一个AbstractInvokable对象,该对象是FLINK衔接执行用户代码的关键。

    • // 这个方法会创建真正的Task,然后调用task.startTaskThread();开始task的执行。
      public CompletableFuture<Acknowledge> submitTask(
      TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
      // taskSlot.getMemoryManager(); 会获取slot的内存管理器,这里就是分割内存的部分功能
      memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
      // 在Task构造函数中,会根据输入的参数,创建InputGate, ResultPartition, ResultPartitionWriter等。
      Task task = new Task(
      jobInformation,
      taskInformation,
      tdd.getExecutionAttemptId(),
      tdd.getAllocationId(),
      tdd.getSubtaskIndex(),
      tdd.getAttemptNumber(),
      tdd.getProducedPartitions(),
      tdd.getInputGates(),
      tdd.getTargetSlotNumber(),
      memoryManager,
      taskExecutorServices.getIOManager(),
      taskExecutorServices.getShuffleEnvironment(),
      taskExecutorServices.getKvStateService(),
      taskExecutorServices.getBroadcastVariableManager(),
      taskExecutorServices.getTaskEventDispatcher(),
      taskStateManager,
      taskManagerActions,
      inputSplitProvider,
      checkpointResponder,
      aggregateManager,
      blobCacheService,
      libraryCache,
      fileCache,
      taskManagerConfiguration,
      taskMetricGroup,
      resultPartitionConsumableNotifier,
      partitionStateChecker,
      getRpcService().getExecutor()); taskAdded = taskSlotTable.addTask(task);
      task.startTaskThread();
      }
    • 开始了线程了。而startTaskThread方法,则会执行executingThread.start,从而调用Task.run方法。

      • public void startTaskThread() {
        executingThread.start();
        }
  • 最后会执行到 Task,就是调用用户代码。这里的invokable即为operator对象实例,通过反射创建。具体地,即为OneInputStreamTask,或者SourceStreamTask等。以OneInputStreamTask为例,Task的核心执行代码即为OneInputStreamTask.invoke方法,它会调用StreamTask.run方法,这是个抽象方法,最终会调用其派生类的run方法,即OneInputStreamTask, SourceStreamTask等。

    • // 这里的invokable即为operator对象实例,通过反射创建。
      private void doRun() {
      AbstractInvokable invokable = null;
      invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
      // run the invokable
      invokable.invoke();
      }
  • tryFulfillSlotRequestOrMakeAvailable

0x09 Slot发挥作用

有人可能有一个疑问:Slot分配之后,在运行时候怎么发挥作用呢?

这里我们就用WordCount示例来看看。

示例代码就是WordCount。只不过做了一些配置:

  • taskmanager.numberOfTaskSlots 是为了设置有几个taskmanager。
  • 其他是为了调试,加长了心跳时间或者超时时间。
public class WordCount {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
conf.setString("heartbeat.timeout", "18000000");
conf.setString("resourcemanager.job.timeout", "18000000");
conf.setString("resourcemanager.taskmanager-timeout", "18000000");
conf.setString("slotmanager.request-timeout", "18000000");
conf.setString("slotmanager.taskmanager-timeout", "18000000");
conf.setString("slot.request.timeout", "18000000");
conf.setString("slot.idle.timeout", "18000000");
conf.setString("akka.ask.timeout", "18000000");
conf.setString("taskmanager.numberOfTaskSlots", "1"); final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params); // get input data
DataSet<String> text = null;
if (params.has("input")) {
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
} else {
// get default test text data
text = WordCountData.getDefaultTextLineDataSet(env);
} DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1); // emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
env.execute("WordCount Example");
} else {
counts.print();
}
} // *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}

9.1 部署阶段

这里 Slot 起到了一个承接作用,把具体提交部署和执行阶段联系起来

前面提到,当TE 提交一个Slot之后,RM会在这个Slot上提交Task。具体逻辑如下:

每次调度ExecutionVertex,都会有一个Execution。在 Execution # deploy 函数中。

  • 会将Execution的状态变更为DEPLOYING状态,并且为该ExecutionVertex生成对应的部署描述信息。其中,ExecutionVertex.createDeploymentDescriptor方法中,包含了从Execution Graph到真正物理执行图的转换。

    • 如将IntermediateResultPartition转化成ResultPartition
    • ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。
  • 然后从对应的slot中获取对应的TaskManagerGateway,以便向对应的TaskManager提交Task。这里一个关键点是:Execution 部署时候,是 从 SingleLogicalSlot ---> AllocatedSlot ---> TaskManagerGateway 这个顺序获取了 TaskManager 的 RPC 网关。
  • 最后通过 taskManagerGateway.submitTask 提交 Task。

具体代码如下:

// 这里一个关键点是:Execution 部署时候,是 从 SingleLogicalSlot ---> AllocatedSlot ---> TaskManagerGateway 这个顺序获取了 TaskManager 的 RPC 网关,然后通过 taskManagerGateway.submitTask 才能提交任务的。这样就把 Execution 部署阶段和执行阶段联系起来了
public void deploy() throws JobException {
final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
.fromExecutionVertex(vertex, attemptNumber)
.createDeploymentDescriptor(
slot.getAllocationId(),
slot.getPhysicalSlotNumber(),
taskRestore,
producedPartitions.values()); // 这里就是关键点
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); // 在这里通过RPC提交task给了TaskManager
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor).thenCompose(Function.identity())
}

9.2 运行阶段

这里仅以Split为例子说明,Slot在其中也起到了连接作用,用户从Slot中可以得到其 TaskManager 的host,然后Split会根据这个host继续操作

当 Source 读取输入之后,可能涉及到分割输入,Flink就会进行输入分片的切分。

9.2.1 FileInputSplit 的由来

Flink 一般把文件按并行度拆分成FileInputSplit的个数,当然并不是完全有几个并行度就生成几个FileInputSplit对象,根据具体算法得到,但是FileInputSplit个数,一定是(并行度个数,或者并行度个数+1)。因为计算FileInputSplit个数时,参照物是文件大小 / 并行度 ,如果没有余数,刚好整除,那么FileInputSplit个数一定是并行度,如果有余数,FileInputSplit个数就为是(并行度个数,或者并行度个数+1)。

Flink在生成阶段,会把JobVertex 转化为ExecutionJobVertex,调用new ExecutionJobVertex(),ExecutionJobVertex中存了inputSplits,所以会根据并行并来计算inputSplits的个数。

ExecutionJobVertex 构造函数中有如下代码,这些代码作用是生成 InputSplit,赋值到 ExecutionJobVertex 的成员变量 inputSplits 中,这样就知道了从哪里得倒 Split:

// set up the input splits, if the vertex has any
try {
InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource(); if (splitSource != null) {
try {
inputSplits = splitSource.createInputSplits(numTaskVertices);
if (inputSplits != null) {
splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
}
}
} // 此时splitSource如下:
splitSource = {CollectionInputFormat@7603} "[To be, or not to be,--that is the question:--, Whether 'tis nobler in the mind to suffer, The slings and arrows of outrageous fortune, ...]"
serializer = {StringSerializer@7856}
dataSet = {ArrayList@7857} size = 35
iterator = null
partitionNumber = 0
runtimeContext = null

9.2.2 File Split

这里以网上文章Flink-1.10.0中的readTextFile解读内容为例,给大家看看文件切片大致流程。当然他介绍的是Stream类型。

readTextFile分成两个阶段,一个Source,一个Split Reader。这两个阶段可以分为多个线程,不一定是2个线程。因为Split Reader的并行度时根据配置文件或者启动参数来决定的。

Source的执行流程如下,Source的是用来构建输入切片的,不做数据的读取操作。这里是按照本地运行模式整理的。

Task.run()
|-- invokable.invoke()
| |-- StreamTask.invoke()
| | |-- beforeInvoke()
| | | |-- init()
| | | | |-- SourceStreamTask.init()
| | | |-- initializeStateAndOpen()
| | | | |-- operator.initializeState()
| | | | |-- operator.open()
| | | | | |-- SourceStreamTask.LegacySourceFunctionThread.run()
| | | | | | |-- StreamSource.run()
| | | | | | | |-- userFunction.run(ctx)
| | | | | | | | |-- ContinuousFileMonitoringFunction.run()
| | | | | | | | | |-- RebalancePartitioner.selectChannel()
| | | | | | | | | |-- RecordWriter.emit()

Split Reader的代码执行流程如下:

Task.run()
|-- invokable.invoke()
| |-- StreamTask.invoke()
| | |-- beforeInvoke()
| | | |-- init()
| | | | |--OneInputStreamTask.init()
| | | |-- initializeStateAndOpen()
| | | | |-- operator.initializeState()
| | | | | |-- ContinuousFileReaderOperator.initializeState()
| | | | |-- operator.open()
| | | | | |-- ContinuousFileReaderOperator.open()
| | | | | | |-- ContinuousFileReaderOperator.SplitReader.run()
| | |-- runMailboxLoop()
| | | |-- StreamTask.processInput()
| | | | |-- StreamOneInputProcessor.processInput()
| | | | | |-- StreamTaskNetworkInput.emitNext() while循环不停的处理输入数据
| | | | | | |-- ContinuousFileReaderOperator.processElement()
| | |-- afterInvoke()

9.2.3 Slot的使用

针对本文示例,我们重点介绍Slot在其中的使用。

调用路径如下:

  • DataSourceTask # invoke,此时运行在 TE

  • DataSourceTask # hasNext

    • while (!this.taskCanceled && splitIterator.hasNext())
  • RpcInputSplitProvider # getNextInputSplit

    • CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(   jobVertexID,   executionAttemptID);
  • RPC

  • 来到 JM

  • JobMaster # requestNextInputSplit

  • SchedulerBase # requestNextInputSplit,这里会从 executionGraph 获取 Execution,然后从 Execution 获取 InputSplit

    • public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
      
      		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
      
      		final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
      
      		final InputSplit nextInputSplit = execution.getNextInputSplit();
      
      		final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
      
      		return new SerializedInputSplit(serializedInputSplit);
      }
    • 这里 execution.getNextInputSplit() 就会调用 Slot,可以看到,这里先获取Slot,然后从Slot获取其 TaskManager 的host。再从 Vertiex 获取 InputSplit

      • public InputSplit getNextInputSplit() {
        final LogicalSlot slot = this.getAssignedResource();
        final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
        return this.vertex.getNextInputSplit(host);
        }
      • public InputSplit getNextInputSplit(String host) {
        final int taskId = getParallelSubtaskIndex();
        synchronized (inputSplits) {
        final InputSplit nextInputSplit = jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
        if (nextInputSplit != null) {
        inputSplits.add(nextInputSplit);
        }
        return nextInputSplit;
        }
        } // runtime 信息如下
        inputSplits = {GenericInputSplit[1]@13113}
        0 = {GenericInputSplit@13121} "GenericSplit (0/1)"
        partitionNumber = 0
        totalNumberOfPartitions = 1
  • 回到 SchedulerBase # requestNextInputSplit,返回 return new SerializedInputSplit(serializedInputSplit);

  • RPC

  • 返回 算子 Task,TE,获取到了 InputSplit,就可以继续处理输入。

    • final InputSplit split = splitIterator.next();
      final InputFormat<OT, InputSplit> format = this.format;
      // open input format
      // open还没开始真正的读数据,只是定位,设置当前切片信息(切片的开始位置,切片长度),和定位开始位置。把第一个换行符,分到前一个分片,自己从第二个换行符开始读取数据
      format.open(split);

0xFF 参考

一文了解 Apache Flink 的资源管理机制

Flink5:Flink运行架构(Slot和并行度)

Flink Slot详解与Job Execution Graph优化

聊聊flink的slot.request.timeout配置

Apache Flink 源码解析(三)Flink on Yarn (2) Resource Manager

Flink on Yarn模式下的TaskManager个数

Flink on YARN时,如何确定TaskManager数

Flink】Flink作业调度流程分析

Flink原理与实现:如何生成ExecutionGraph及物理执行图

Flink源码走读(一):Flink工程目录

flink分析使用之七任务的启动

flink源码解析3 ExecutionGraph的形成与物理执行

Flink 内部原理之作业与调度

Flink之用户代码生成调度层图结构

3. Flink Slot申请

Flink 任务和调度

Flink的Slot是如何做到平均划分TM内存的?

Flink-1.10.0中的readTextFile解读

Flink1.7.2 Dataset 文件切片计算方式和切片数据读取源码分析

flink任务提交流程分析

Flink Parallelism和Slot理解

最新文章

  1. BridgePattern(桥接模式)
  2. Linux服务器安全登录设置记录
  3. 架构模式之REST架构
  4. 基本的MFC多线程
  5. HDU 1455 http://acm.hdu.edu.cn/showproblem.php?pid=1455
  6. PHP 读取EXCEL
  7. HDU 1061 N^N (n的n次方的最后一位)
  8. MySQL进口.sql文件和常用命令
  9. linux的mount(挂载)NFS 共享,命令详解
  10. nodejs+websocket制作聊天室视频教程
  11. vue 2 使用Bus.js进行兄弟(非父子)组件通信 简单案例
  12. java设计模式--观察者模式(Observer)
  13. ABP学习笔记总汇
  14. Eureka源码探索(一)-客户端服务端的启动和负载均衡
  15. mac 下直接给docker容器加映射 mysql 为例
  16. ADB三个进阶使用
  17. UVALive 6893 The Big Painting hash
  18. 〖Android〗联想K860 logcat CM11.0出错信息及解决
  19. 快速排查SQL服务器阻塞语句
  20. R语言学习——数据分析

热门文章

  1. 软件tf版本是什么意思?
  2. 一张图理清 Python3 所有知识点
  3. XCTF-WEB-高手进阶区(1-4)笔记
  4. 2020重新出发,JAVA学前了解,Windosws常用快捷键
  5. LInux回顾与Shell编程
  6. HTTP PUT/POST/PATCH的区别
  7. Linux内核之 内核同步
  8. js对象的数据属性和访问器属性
  9. troubleshoot之:使用JFR分析性能问题
  10. C#分布式登录——jwt