spark streaming是建立在spark core之上的,也就说spark streaming任务最终执行还是依赖于RDD模型。在转化成最终的RDD模型执行前,spark streaming主要需要处理以下几个问题:

a,每个batch的RDD是怎么根据用户的代码生成的(对应JobGenerator)?

b,数据是怎么从外部接收的(对应receiver)?

c,每个batch的任务是怎么触发的(对应JobGenerator)?

d,怎么保证spark streaming任务的可靠性?

本文主要针对a,b,c这三个问题做深入分析。

1,DStream拓扑结构

当写spark批处理应用时,通过RDD形成了DAG的计算拓扑。类似的,在spark streaming中通过DStream形成了计算模板的拓扑。当定义好DStream的计算模板以后,每个batch就可以基于该模板生成RDD的计算拓扑。以example中streaming的NetworkWordCount为例:

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

生成的DStream拓扑结构如下:

以上拓扑结构图中的节点主要分为三类:输入流,一般DStream计算节点,输出流节点。

2,DStream处理的整体流程

当StreamingContext启动以后,streaming任务的整体流程逻辑图如下:

核心要点如下:

1,ReceiverTracker(位于driver端),主要负责对位于executor端的Receiver进行控制。包括通过提交任务启动Receiver,接收Receiver端Block相关的信息汇报等。

2,JobGenerator(位于driver端),主要作用是通过一个定时器定期生成任务。生成任务主要包括四个步骤:

a,根据receiver接收并且上报给ReceiverTracker的信息,生成当前batch的RDD输入数据。

b,根据用户定义的DStream拓扑结构模板生成当前batch的Jobs

c,将步骤b中生成的Job分装成Jobset,交由JobHandler去执行。在Job执行过程中,将有可能触发底层RDD任务提交和计算。

d,通过检查点,保存当前JobGraph的状态。

3,ReceiverSupervisor(位于executor端),主要负责管理executor段的Reciver,包括启动Receiver,保存Reciever接收的数据以及发送相关消息给Driver端的ReceiverTracker。

接下来,将解释一下开头提出的问题

Q1,每个batch的RDD是怎么根据用户的代码生成的(对应JobGenerator)?

首先,应用通过DStream形成了RDD生成的模板。其次,在JobGenerator定时按照batchTime生成的任务的时候,会从输出流开始(ForEachDStream注册),递归地调用DStream中getOrCompute方法,封装成Job。在Job中就包含了每个batch之间的RDD DAG。

Q2,数据是怎么从外部接收的(对应receiver)?

首先,接收数据实在executor端进行的。其次,Receiver持续不断的接受数据,并且将数据通过ReceiverSupervisor借助RecevierHanlder进行保存,最终将数据按block保存,并且向Driver汇报接受的数据信息。

Q3,每个batch的任务是怎么触发的(对应JobGenerator)?

在Driver端的JobGenerator有一个定时器,每隔batchTime时间定期出发一次任务生成。具体要做的事情已阐述。

Q4,怎么保证spark streaming任务的可靠性?

保证可靠性涉及到driver和executor端,在本文中,可以看到的一点是在任务生成以后,会通过检查点方式保存当前JobGraph的状态。其他待后续总结。

最新文章

  1. Eclipse上安装GIT插件EGit及使用
  2. vue 配置文件详解
  3. Javascript的setTimeOut()和setInterval()的定时器用法
  4. 解决IE6,IE7下子元素使用position:relative、父元素使用overflow:auto后,子元素不随着滚动条滚动的问题
  5. 【原】常见CSS3属性对ios&android&winphone的支持
  6. Hadoop FS shell commands
  7. C# ZedGraph 控件各属性以及示例
  8. dev c++ 的一些快捷键
  9. 如何修正导入模型的旋转? How do I fix the rotation of an imported model?
  10. C#调用C++的DLL 数据类型转换
  11. Mac下添加java环境变量
  12. 更改Activity的最底层的布局
  13. USACO 3.1 Humble Numbers
  14. 初学Git命令
  15. 【Java】利用注解和反射实现一个"低配版"的依赖注入
  16. 自动化测试基础篇--Selenium iframe定位问题
  17. Mysql/MariaDB的多主集群实现:Galera Cluster
  18. UVA 10090 Marbles(扩展欧几里得)
  19. keepalived openssl 报错
  20. PHP_OS常量使用方法

热门文章

  1. Hadoop 文件压缩
  2. 从源码理解 ThreadLocal
  3. vue中手机号,邮箱正则验证以及60s发送验证码
  4. java如何判断字符串是否为空(小知识)
  5. IDEA maven dependency自动提示
  6. Silverlight结合Web Service进行文件上传
  7. SPFA 最短路 带负权边的---- 粗了解
  8. LA-3905 (扫描线)
  9. 时间:NSTimer,代码时运行时间段,
  10. CodeForces546D:Soldier and Number Game(筛区间素数因子个数和)