本文源码基于flink1.14

在帮助用户排查任务的时候,经常会发现部分task处理的慢,在Exactly once语义时需要等待快照的对齐而白白柱塞的情况

在flink1.11版本引入了非对齐的checkpoint,来解决这种柱塞问题,所以来看看这个新特性的源码是如何实现的

先看下官网的图来总的说下实现原理,再来看看源码

flink是基于Chandy-Lamport算法来实现全局快照的,其核心就是在数据中间穿插barrier

当一个task上游同一批次所有的barrier到齐时,就可以触发快照状态的保存了,问题就是出在这里,等待对齐

来看下上面官网的图,是官网优化的一个具体思路

当某task的第一个barrier到时,那内部当前批次的状态必然是不完整的,那多久才算完整呢,等到这批checkpoint的数据全部都到齐都处理完,那状态就完整了

那当第一个barrier到的时候,剩下没到的数据在哪呢,答案就是,上游task的outBuffer(ResultSubpartition)和自己inBuffer(inputGates)里面

ok.分析到这里就可以看flink的思路了,如果当第一个barrier来的时候我不能触发checkpoint, 是因为还有部分数据没有处理到

那干脆就直接把这部分还没处理的数据(在buffer里面的数据),连同状态数据一起保存到checkpoint里面不就行了吗   ???

在从checkpoint恢复的时候就先把这部分buffer数据, 先恢复到当前task的buffer里面,继续计算就可以了,其实弱化了每个checkpoint批次的概念

这样一来当收到第一个barrier的时候,就可以直接触发checkpoint了

下面就是来看看Flink 源码的实现了

看下熟悉的StreamTask因为barrier在flink里面依然被当做数据的一种,在主循环里面看看接收到以后做了什么

先看输入inputBuffer的保存

在AbstractStreamTaskNetworkInput中接收数据的时候从inputGate拉取数据的时候

可以看到会根据数据的类型,如果是barrier类型会走到processBarrier方法

注意这里的这个barrierHandler是SingleCheckpointBarrierHandler实现类,因为非对齐模式的话收到第一个barrier就触发checkpoint了,所以也等同于sigle了

这里的state是实现类AlternatingWaitingForFirstBarrierUnaligned是非对齐模式特有的

来看看怎么处理的

可以看到在 SubtaskCheckpointCoordinatorImpl 中会准备inputFlight数据的快照,目的肯定就是为了保持到checkpoint中

这个方法prepareInflightDataSnapshot方法看下

会调用 BiFunctionWithException prepareInputSnapshot这个action对象,这个对象从哪里传进来的呢?

原来在StreamTask构造函数的时候就通过自己的prepareInputSnapshot方法来创建这个Function了

来看下这个方法的逻辑

会遍历所有的inputProcess然后调用它的准备快照方法

这个方法里面具体

就将具体的input的数据保存到state里面去了

input的保存就说完了

接着来看output缓存数据的保存

会到最开始的AlternatingWaitingForFirstBarrierUnaligned类当保存完input buffer的数据以后initInputsCheckpoint方法之后

来看下后面的逻辑

当触发完input数据的保存以后,就是触发全局的checkpoint了,这里会一直走到streamTask的triggerCheckpointOnBarrier在里面会走到performCheckpoint

最后在SubtaskCheckpointCoordinatorImpl类中

最后在 BufferWritingResultPartition 类里面

PipelinedSubpartition调用addbuffer然后将channelStateWriter.addOutputData把output buffer的数据保存到状态里面去了

讲完触发checkpoint保存缓存中的数据,接下来就是从chekpoint恢复的时候怎么恢复这些未处理的数据了

来看下StreamTask如果从chekpoint恢复的是否是如何处理的

restore方法调用了restoreGates

这里就是将数据in buffer状态的保存到InputGate, 然后out buffer 的状态数据保存到ResultPartitionWriter里面去,继续处理了

over

最新文章

  1. Allegro 导入DXF文件,保留布好的线路信息
  2. (转) silverlight 样式学习
  3. Python的平凡之路(7)
  4. 让我们一起Go(十二)
  5. Nginx和PHP-FPM的启动/重启脚本 [转发]
  6. Android仿微信图片上传,可以选择多张图片,缩放预览,拍照上传等
  7. [LeetCode] Search in a Binary Search Tree 二叉搜索树中搜索
  8. MySQL 索引的增删查
  9. Access restriction: The type 'Unsafe' is not API
  10. Spring 的属性注入
  11. 图解 Paxos 一致性协议
  12. Go语言 数据类型,流程控制
  13. 51nod 1239 欧拉筛模板
  14. Objective-C语法之nonatomic和atomic之间的区别
  15. 2018.08.27 [Usaco2017 Jan]Promotion Counting(线段树合并)
  16. MYSQL 入门全套
  17. [Node.js] Level 6. Socket.io
  18. 回文树(统计所有回文串的个数) - MCCME 1750 Подпалиндромы
  19. Verilog MIPS32 CPU(六)-- MDU
  20. vue.js中引入其他文件export的方法:

热门文章

  1. [BUUCFT]PWN——pwn2_sctf_2016
  2. android 使用 perfetto 抓取atrace
  3. LuoguP5238 整数校验器 题解
  4. JAVA通过正则匹配html里面body标签的内容,去掉body标签
  5. JAVA生成订单编号工具类
  6. 【LeetCode】Gas Station 解题报告
  7. 【LeetCode】9. Palindrome Number 回文数
  8. 【LeetCode】896. Monotonic Array 解题报告(Python)
  9. Primitive Roots(poj1284)
  10. Generating Adversarial Examples with Adversarial Networks