Spark Streaming揭秘 Day27 Job产生机制
Spark Streaming揭秘 Day27
Job产生机制
今天主要讨论一个问题,就是除了DStream action以外,还有什么地方可以产生Job,这会有助于了解Spark Streaming的本质。
我们从一个print方法触发,来看一下常用的action操作方式:
传统流程
一般来说,产生Job需要DStream的action操作,比如print方法
第一步,在print方法中,返回了一个ForeachDStream
第二步,在ForeachDStream中,通过被DStreamGraph调用generateJob方法,构造了运行的Job,但此时Job并未被调用。
第三步,在JobGenerator中,执行Job中的内容。
例外流程
是不是只有在foreachRDD之类outputDStream的操作中才能产生Job的执行? 让我们来看下transform这个方法。这个方法对当前DStream上的RDD执行某种操作,以RDD为输入,产生一个新的RDD。
请看一下这个方法中compute方法的实现:
transformFunc是构造时传进来的,这个方法在这里会执行,也就说,在这里我们可以进行任意操作,包括执行Job!!!
也就是说,如果在调用transform方法的transformFunc里有action的操作,就会绕过上述传统流程的第三步,精简为两步来执行Job。这个方法的本意是提供一些DStream不支持的操作,但实际是个后门。
这个方法灵活性很高,可以让我们提前获得结果,这个意义是很大的,使用这个特性可以做出一些很巧妙的效果。直接调用意味着每一步都能直接获取结果,那么就可以基于前面的结果进行判断,然后进行后面的操作,比如如下操作。
lines.transform( rdd => {
if (rdd.count > 0) {
sqc.jsonRDD(rdd).registerTempTable("logstash")
val sqlreport = sqc.sql("SELECT message, COUNT(message) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
sqlreport.map(r => (r(0).toString -> Status(r(2).toString.toDouble, r(1).toString.toInt)))
} else {
rdd.map(l => ("" -> Status()))
}
})
很明显,transform中的方法内容更为丰富,这个可以突破DStream上方法都是单一职责的限制。
欲知后事如何,且听下回分解!
DT大数据每天晚上20:00YY频道现场授课频道68917580
最新文章
- destoon二次开发基础代码
- Sum All Primes
- codeforces 361 A - Mike and Cellphone
- Android 布局之TableLayout
- 基于IHttpAsyncHandler的TCP收发器
- 【py分析网页】可能有用的-re去除网页上的杂碎
- Adapter的getView方法详解
- mxml日期显示使用
- java dom4j解析xml实例(2)
- 子集构造法实现NFA的确定化
- 开始你的第一个npm脚本工具
- Language Modeling with Gated Convolutional Networks
- Ionic APP-Web SPA开发进阶(二)Ionic进阶之路由去哪了
- ArcCore重构-头文件引用问题的初步解决
- 获取window.location.href中传的值,并且转换成json数据使用
- PHP日志切割shell
- BCG界面库
- Ehcache3开发入门简介
- jsp 中出现大量红线,而且页面能正常访问
- java实现office文件预览
热门文章
- CSS 创建方式与优先级
- 求可能组合VB源码代写
- js判断图片上传时的文件大小,和宽高尺寸
- python(3)-lambda表达式
- java.lang.IllegalArgumentException: Service Intent must be explicit: Intent
- wince 位图的使用
- 【思路、优化】UVa 11491 - Erasing and Winning
- hihocoder 1237 Farthest Point
- CSE(Corrupted State Exceptions) 严重异常处理办法
- 分享4个未注册*sdn域名