本文源码基于flink1.14

上一篇文章分析了《flink的minibatch微批处理》的源码

乘热打铁分析一下两阶段聚合的源码,因为使用两阶段要先开启minibatch,至于为什么后面会分析到

两阶段聚合的原理,还是简单提一下

如下图,当聚合发生热点的时候,可以在聚合前,先进行一个本地的聚合,先减小数据量,后接正常的数据交换以后聚合,来达到一个解热点的目的,

先来看下两阶段聚合的Calcite优化rule

看下什么情况会匹配上

并且在onmatch方法中会判断开启了minibatch,以及二阶段聚合的时候会调用

来看下具体逻辑match方法

整个两阶段聚合会将原来的一个StreamPhysicalGroupAggregate物理节点,转换成一个

StreamPhysicalLocalGroupAggregate本地聚合节点 + StreamPhysicalGlobalGroupAggregate聚合节点

来看下这个新添加的StreamPhysicalLocalGroupAggregate本地聚合算子的计算逻辑是什么样子的

StreamExecLocalGroupAggragate就是StreamPhysicalLocalGroupAggregate本地聚合具体的ExecNode节点了

来看下具体的operator

看到这里是不是看到了熟悉的 MapBundleOperator ,如果看过上一篇minibatch优化的就知道,两阶段提交也是使用的这个有界operator作为抽象

在了解一下这个MapBundleOperator

就是每来一条数据,都会调用传入的fun的addInput方法

然后把每个key的结果put保存在一个本地变量,就是个map<Rowdata,Rowdata>里面

然后调用自己的trigger触发器,当这条数据可以触发触发器就会调用finishBundle

这里说到触发器,回到初始化mapBundle的时候通过createMiniBatchTrigger创建的一个minibatch的触发器,看看具体逻辑

其实就是一个普通的count触发器,触发条件就是直接使用的minibatch配置的size参数,  所以这里知道了为什么两阶段提交要先开minibatch了

先看下每来一条数据会触发的addInput方法,在来看看攒一个批次后触发的finishBundle

minibatch会包装成一个MiniBatchLocalGroupAggFunction这个funtion的addInput来看看

就是来一条数据直接调用聚合函数的accumulate直接计算结果了,虽然计算结果但是还没有往下游发送

来看下当攒一批后,集体是怎么往下游发送的 finishBundle 方法

结果都已经计算好了,攒一个批次还能干嘛,就是把当前的计算结果往下游发送呗

那整个二次聚合的优化就讲完了

总结一下

sql会将agg拆成 localminiagg + agg

先在本地聚合localConbine一遍,再往下游发送

下游就正常聚合,优化了热点的问题

最新文章

  1. python学习09——字典(3)
  2. 使用npm安装一些包失败了的看过来(npm国内镜像介绍)
  3. SpringMVC使用@PathVariable,@RequestBody,@ResponseBody,@RequestParam,@InitBinder
  4. ASP.NET MVC下的四种验证编程方式[续篇]【转】
  5. [c/c++]指针数组 pk 数组指针
  6. [Jquery] jQuery.cookie帮助类 (转载)
  7. Solr4.8.0源码分析(16)之SolrCloud索引深入(3)
  8. 邮件协议(SMTP)性能测试总结(Foxmail邮箱)
  9. hdr_beg(host) 主机名开始
  10. ICT工作的思考&amp;lt;两&amp;gt;
  11. ASP.NET 5:初始化数据库
  12. 大数据量情况下求top N的问题
  13. 在CentOS 7.3 中安装 NVIDIA GT730 显卡驱动
  14. 手 Q 人脸识别动画实现详解
  15. Beta 第一天
  16. MVC设计思想
  17. python学习记录20181220
  18. XML——对XML文档的创建与增删改查
  19. SQLServer之创建非聚集索引
  20. LwIP Application Developers Manual9---LwIP and multithreading

热门文章

  1. linux查询健康状态,如何直观的判断你的Linux系统是否健康
  2. 联盛德 HLK-W806 (七): 兼容开发板 LuatOS Air103
  3. 在写易买网时产生的错误 JSTL标签库中&lt;c:choose&gt;&lt;/c:choose&gt;不能放JSP页面&lt;!-- --&gt;注释
  4. Mysql配置文件 扩展详细配置
  5. Nginx模块之ngx_http_gzip_module
  6. Offset函数(Excel函数集团)
  7. Swagger如何匹配多个Controller类或者目录
  8. 12 - Vue3 UI Framework - 打包发布
  9. CF918B Radio Station 题解
  10. 资源分享 | PyTea:不用运行代码,静态分析pytorch模型的错误