一、以Wordcount为例来分析

1、Wordcount

val lines = sc.textFile()
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
counts.foreach(count => println(count._1 + ": " + count._2))

2、源码分析

###org.apache.spark/SparkContext.scala
###textFile() /**
* 首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素,其实是(key,value)pais
* key是hdfs或文本文件的每一行的offset,value是文本行
* 然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionRDD
* MapPartitionRDD内部的元素,其实就是一行一行的文本行
* @param path
* @param minPartitions
* @return
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map( pair => pair._2.toString).setName(path)
} ###org.apache.spark.rdd/RDD.scala def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
} def map[U: ClassTag](f: T => U): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
} 其实RDD里是没有reduceByKey的,因此对RDD调用reduceByKey()方法的时候,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,
会在RDD中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions。 implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
} 接着会调用PairRDDFunctions中的reduceByKey()方法; def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
} ###org.apache.spark.rdd/RDD.scala def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
} foreach调用了runJob方法,一步步追踪runJob方法,首先调用SparkContext的runJob: def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
} … 最后:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 调用SparkContext,之前初始化时创建的dagScheduler的runJob()方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

最新文章

  1. css3深入了解之奇技淫巧
  2. Android -- 自定义控件(ImageButton)
  3. bootstrap和jquery mobile的对比
  4. ubuntu下的wps不能使用中文.
  5. The EM Algorithm
  6. 从零开始学ios开发(四):IOS控件(1),Image View、Text Field、Keyboard
  7. SQL注入常用语句
  8. 函数fil_extend_space_to_desired_size
  9. 风云CM - 算法分析 & genkey实现
  10. C 和 OC 字符串转换 NSString 和 char * 转换 const char* 与 char *
  11. $translate 的用法
  12. CentOS 6.5安装MongoDB 2.6(多yum数据源)
  13. html中 submit和button的区别?
  14. react-router路由地址变了页面却没有跳转的解决办法
  15. 大家都知道fastclick能解决300ms延迟,现在我们来看一下,使用方法
  16. pytest自动化7:assert断言
  17. C++的qsort函数
  18. MySQL面试试题与答案
  19. Movavi Video Editor 15 Plus Mac怎样更改视频的分辨率?
  20. bzoj1861

热门文章

  1. webapi session
  2. 【洛谷 P3346】 [ZJOI2015]诸神眷顾的幻想乡(后缀自动机)
  3. HttpClinet工具类
  4. input file 无法打开手机端文件选择器
  5. echart 人头
  6. 造成thrift 编译构建项目失败的原因之一:thrift环境变量没设置
  7. Js网站开发学习第一天
  8. mysql查看当前实时连接数
  9. HTML&CSS基础-字体的样式
  10. yum源仓库的三种搭建方式