Spark-Dependency
1、Spark中採用依赖关系(Dependency)表示rdd之间的生成关系。Spark可利用Dependency计算出失效的RDD。在每一个RDD中都存在一个依赖关系的列表
private var dependencies_ : Seq[Dependency[_]] = null
用以记录各rdd中各partition的parent partition。
2、Spark中存在两类Dependency:
1)NarrowDependency表示的是一个父partition仅相应于一个子partition。这种依赖关系是不须要shuffle的。在这类依赖中。能够依据getParents方法获取某个partition的父partitions:
/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* 唯一的接口。获得该partition的全部parent partition
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
}
这类又可分为:
a、OneToOneDependency:表示一一相应的依赖关系,因为在这样的依赖中父partition与子partition Id是一致的,所以getParents直接原样返回。相应的转换操作有map和filter
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
/**
* 事实上partitionId就是partition在RDD中的序号, 所以假设是一一相应, 那么parent和child中的partition的序号应该是一样的
*/
override def getParents(partitionId: Int) = List(partitionId)//原样返回
}
b、PruneDependency(org.apache.spark.rdd.PartitionPruningRDDPartition):未详
/**
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) { @transient
val partitions: Array[Partition] = rdd.partitions
.filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } override def getParents(partitionId: Int) = {
List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
}
}
c、RangeDependency:这样的是父rdd的连续多个partitions相应子rdd中的连续多个partitions。相应的转换有union
/**Union
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD parent RDD中区间的起始点
* @param outStart the start of the range in the child RDD child RDD中区间的起始点
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = {
if (partitionId >= outStart && partitionId < outStart + length) {//推断partitionId的合理性,必须在child RDD的合理partition范围
List(partitionId - outStart + inStart)//算出parent RDD中相应的partition id
} else {
Nil
}
}
}
2)WideDependency:这样的依赖是指一个父partition能够相应子rdd中多个partitions。因为须要对父partition进行划分,故须要用到shuffle,而shuffle通常是採用键值对的。
这里为每一个shuffle分配了一个全局唯一的shuffleId。
为了进行shuffle。须要指定怎样进行shuffle,这相应于參数partitioner;因为shuffle是须要网络传输的。故须要进行序列化Serializer。在宽依赖中并无法获得partition相应的parent partitions?
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,//须要给出partitioner, 指示怎样完毕shuffle
val serializer: Serializer = null)//shuffle不象map能够在local进行, 往往须要网络传输或存储, 所以须要serializerClass
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) { val shuffleId: Int = rdd.context.newShuffleId()//每一个shuffle须要分配一个全局的id, context.newShuffleId()的实现就是把全局id累加 rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
最新文章
- SQLSERVER 使用 ROLLUP 汇总数据,实现分组统计,合计,小计
- AngularJS的简单使用
- Java接口之间的继承
- JavaScript强化教程——jQuery UI API 类别
- ---Ubuntu 14.04 虚拟机器和主机时间同步
- iOS中plist的创建,数据写入与读取
- Qt StyleSheet皮肤(黑色,比较好看,而且很全)
- Git管理unity3d项目
- 自定义 SharePoint 2010 快速启动栏和顶部链接栏
- 使用WiX Toolset创建.NET程序发布Bootstrapper(安装策略管理)(一)-----初识WiX (转)
- CSS HACK的方法
- VC++如何在程序中用代码注册和卸载ocx控件(代码)
- Ubuntu14.04上安装tftpd服务
- js 判断字符串中是否有某字符串
- Android.mk简单分析
- linux下文件和目录的颜色表示
- 【原创】控制perl和python脚本执行过程中脚本文件是否关闭的方法
- MSSQL在线文件还原脚本
- WIN10文件无法自动刷新问题解决方法
- NAT ------ 内网的主机如何通过路由器与外网的主机通信
热门文章
- 【Kruskal】Slim Span
- BZOJ 1588 [HNOI2002]营业额统计(双向链表)
- 【LIS】【递推】Gym - 101246H - ``North-East&#39;&#39;
- 【贪心】 Codeforces Round #419 (Div. 1) A. Karen and Game
- 【带权并查集】Gym - 100923H - Por Costel and the Match
- [CF340D]Bubble Sort Graph/[JZOJ3485]独立集
- cookie和localStorage、sessionStorage的区别
- HDU 5631 Rikka with Graph 暴力 并查集
- Matlab中如何读取.dat文件
- Android关于JSON数据解析