一、数据处理原理剖析

每隔我们设置的batch interval 的time,就去找ReceiverTracker,将其中的,从上次划分batch的时间,到目前为止的这个batch interval time间隔内的block封装为一个batch;

其次,会将这个batch中的数据,去创建为一个初始的RDD,一个batch内,在这段时间封装了几个block,就代表这个batch对应的RDD内会有几个partition;

这个batch对应的RDD的partition决定了数据处理阶段的并行度,这个跟调优关系很大,如果想增加数据处理阶段的性能,就考虑增加并行度,那么就考虑缩短block interval;

只有output操作中,使用了ForEachStream,其中定义了generatorJob()方法,在数据处理阶段,才触发针对接收到的一个一个batch的数据,触发小的job,去处理该batch的数据;

最后一步,去找JobScheduler去调度job,job的输入RDD,就是batch对应的RDD;

二、源码分析

入口,JobGenerator的generateJobs()方法

###org.apache.spark.streaming.scheduler/JobGenerator.scala

 /**
* 定时,调度generateJobs()方法,传入一个time,其实就是一个batch interval内的时间段
*/
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// 找到ReceiverTracker,调用其allocateBlocksToBatch方法,将当前时间段内的block分配给一个batch,并为其
// 创建一个RDD
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
// 调用DSteamGraph的generateJobs()来根据程序定义的DSteam之间的依赖关系和算子,生成job
graph.generateJobs(time) // generate jobs using allocated block
} match {
// 如果成功创建了job
case Success(jobs) =>
// 从ReceiverTracker中,获取当前batch interval对应的block数据
val receivedBlockInfos =
jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
// 用jobScheduler提交job,其对应的原始数据,是那批block
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}

最新文章

  1. c#版在pc端发起微信扫码支付
  2. 继续转 [转]php版本的cron定时任务执行器
  3. 使用xml来显示获取的mysql数据
  4. 线性结构CT 02-线性结构1 一元多项式的乘法与加法运算
  5. 使用cookie解决微信不能存储localStorage的问题
  6. Breaking parallel loops in .NET C# using the Stop method z
  7. Linux下安装Android的adb驱动-解决不能识别的问题
  8. Python自动化运维之17、Python操作 Memcache、Redis、RabbitMQ
  9. Win7 和 MAC 系统通过VMware共享文件夹(简单又好用,几乎什么都不用设置)
  10. Win10 VS2012 无法注册IIS4.0 解决方案
  11. C++ Opencv remap()重映射函数详解及使用示例
  12. 百度站内搜索https不可用切换api搜索,加上谷歌api站内搜索
  13. 《蹭课神器》Alpha版使用说明
  14. C#连接数据库MD5数据库加密
  15. Xshell5中常用linux服务器命令集合
  16. Spring4新特性——集成Bean Validation 1.1(JSR-349)到SpringMVC
  17. time out 超时
  18. 【leetcode 简单】 第九十五题 数字转换为十六进制数
  19. 「小程序JAVA实战」小程序视图之细说列表渲染(14)
  20. vue数据响应的坑

热门文章

  1. Kafka Replication: The case for MirrorMaker 2.0
  2. 定时任务-Windows任务
  3. ASP.NET WebApi 学习与实践系列(2)---WebApi 路由请求的理解
  4. github上传本地项目代码
  5. redux reducer笔记
  6. mvc和mvvm模式
  7. tcp、udp协议栈
  8. Github强制找回管理员账号密码
  9. mysql修改表结构,添加double类型新列
  10. ETL 的一些概念