TaskManager接收到来自JobManager的jobGraph转换得到的TDD对象,启动了任务,在StreamInputProcessor类的processInput()方法中

通过一个while(true)中不停的拉取上游的数据,然后调用streamOperator.processElement(record)调用用户实现的方法去处理数据拉取的数据

首先先来看下这个operator对象

然后看看OneInputStreamOperator类的UML

这里所有的实现类没有全部列出,只列了一些代表

看到这里,写过Flink的streamAPI的同学,肯定感觉到很熟悉!!!!!!

这里!不就是我们常写flink代码的那些算子嘛

对没有错,我们程序中实现的那些算子逻辑,最后都会被封装成一个OneInputStreamOperator,这里具体看一个最熟悉的Fliter

来看一下StreamFilter的processElement方法

!!!这里传入一个数据后,这个userFunction调用了filter方法并且把数据放进去了

当返回true通过这个output.collect发送出去了

这不就对应了我们用户自己实现的filter算子嘛,没错这个方法其实就是客户端的filter方法,这个userFunction包含了用户实现filter算子的逻辑

(!!!!!就是说这个processElement方法会调用用户的逻辑)

(所以这个userFunction可以带上client的方法实现,这对我们很重要,特别是对flink源码修改,为clientApi添加新功能方法,运行时可以通过这里拿到)

继续

来看看这个output.collect()方法

然后

看到这个,等等等等

我不是从这个processElement()方法进来的吗,怎么又开始调processElement()方法了

难道递归了? 不对不对

这里operator不是上一个operator了,而是这个output对象的(这里是chainOutPut)

看下这个output对象

看下UML类图,也是只列举了重要的

先看chainingOutPut的属性

发现了又出现了OneInputStreamOperator对象

看到这个实现类的名字!chain联想起了什么

Flink会将可以chain在一起的算子在streamGraph转换成jobGraph的时候根据条件chain在一起

一惊!

来分别看一下ChainingOutPut和RecordWriterOutput的collect()方法有什么区别

在chain中

在RecordWriter中

这里chain的ouput,又继续调用了下一个operator的processElement方法,然后又在processElement方法中又调用output.collect( ),collect中又调用了下一个operator的processElement方法

整个过程就是个无限的循环,直到,某一个operator的ouput不为ChainingOutPut,当变为RecordWriterOutput时

上面看到RecordWriterOutput的processElement直接emit发送出去了这个数据,再也没有继续调用processElement方法了

这里也就对应了,flink中的责任链,chain在一起的算子会一个接着一个执行,直到无法chain,就会往下游发送emit了

来看一下UML类图帮助理解

里中有我,我中有你,一直相互调用直到无法chain,然后emit往下游发送(这里肯定就有发送端的反压逻辑,以后随缘更新)

那这里的循环调用理解了就会想,那如何确定第一个operator调用,然后进入整个调用链呢

回到TaskManager接收到JobManager的TDD以后初始化整个任务的时候

StreamTask.java中invoke方法中

先是初始化了一个OperatorChain,里面其实就是一个数组StreamOperator

在他初始化的时候,其实就是为我们所有的streamOutputs设置了他的output以及会根据jobManager发送过来的TDD(包含信息)

设置成对应的ChainingOutPut还是RecordWriterOutput,chainOutput会设置他的的operator

然后获取了getHeadOperator()其实就是获取了他调用连中的第一个

然后在

将这个第一个operator关联到了inputProcessor对象里面

后面就简单了在inputProcessor.processInput中就进入了while(true)循环拉取上游数据的逻辑

然后

在这里调用的第一个processElement方法就是我们的那个headOperator

这样整个调用责任链就开始从第一个Operator运行起来了

最新文章

  1. nyoj 106背包问题(贪心专题)
  2. vpn是什么?手机vpn是什么?
  3. 主流 SQLServer 迁移到 MySQL 工具对比
  4. Hadoop运行错误纪录
  5. JAVA使用POI读取EXCEL文件的简单model
  6. 轻量级远程调用框架-Hessian学习笔记-Demo实现
  7. JavaScript中值的真真假假(true and false)
  8. 转:LINQ查询返回DataTable类型
  9. 浅谈DevExpress<五>:TreeList简单的美化——自定义单元格,加注释以及行序号
  10. Spring 5.0.0.RC1 - CORS Support 【译文】
  11. JSON数据Key排序
  12. Java后台使用Websocket教程
  13. 关于Idea模块化部署web项目,Web Resource Directories作用
  14. [福大软工] Z班 第7次成绩排行榜
  15. WebClient 上传文件
  16. mysql读写分离[高可用]
  17. session的理解和使用
  18. null id in entry (don't flush the Session after an exception occurs)
  19. 转 Java的 BigDecimal类型比较大小
  20. 【转】Java Spring AOP详解

热门文章

  1. 优雅的对象转换解决方案-MapStruct使用进阶(二)
  2. 最小化docker镜像
  3. 搞定java String校招面试题
  4. CODING 告诉你如何建立一个 Scrum 团队
  5. Nightwatch——自动化测试(端对端e2e)
  6. cmd命令行带参启动程序
  7. (二)对象以及变量的并发访问--synchronized的使用细节,用法
  8. python第三课--函数
  9. 基于vue手写tree插件那点事
  10. SpringBoot内置tomcat启动原理