RDD创建

在Spark中创建RDD的创建方式大概可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。

  1. 由一个已经存在的Scala集合创建,集合并行化,而从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

两个函数的声明

def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

我们可以从上面看出makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致。其实第一种makeRDD函数实现是依赖了parallelize函数的实现,来看看Spark中是怎么实现这个makeRDD函数的:

def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}

我们可以看出,这个makeRDD函数完全和parallelize函数一致。但是我们得看看第二种makeRDD函数函数实现了,它接收的参数类型是Seq[(T, Seq[String])],Spark文档的说明是:

  Distribute a local Scala collection to form an RDD,
with one or more location preferences (hostnames of Spark nodes)
for each object. Create a new partition for each collection item.

原来,这个函数还为数据提供了位置信息,来看看我们怎么使用:

scala> val guigu1= sc.parallelize(List(1,2,3))
guigu1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21 scala> val guigu2 = sc.makeRDD(List(1,2,3))
guigu2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21 scala> val seq = List((1, List("slave01")),| (2, List("slave02")))
seq: List[(Int, List[String])] = List((1,List(slave01)),
(2,List(slave02))) scala> val guigu3 = sc.makeRDD(seq)
guigu3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23 scala> guigu3.preferredLocations(guigu3.partitions(1))
res26: Seq[String] = List(slave02) scala> guigu3.preferredLocations(guigu3.partitions(0))
res27: Seq[String] = List(slave01) scala> guigu1.preferredLocations(guigu1.partitions(0))
res28: Seq[String] = List()

我们可以看到,makeRDD函数有两种实现,第一种实现其实完全和parallelize一致;而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:


def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
} def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}

都是返回ParallelCollectionRDD,而且这个makeRDD的实现不可以自己指定分区的数量,而是固定为seq参数的size大小。

由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等

scala> val atguigu = sc.textFile("hdfs://hadoop102:9000/RELEASE")
atguigu: org.apache.spark.rdd.RDD[String]
= hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24

本博客仅为博主学习总结,感谢各大网络平台的资料。蟹蟹!!

最新文章

  1. iOS开发系列--App扩展开发
  2. win 2012 修改盘符
  3. 【codevs1409】 拦截导弹 2
  4. Android-Socket传输 GPRS网络
  5. android旋转动画和平移动画具体解释,补充说一下假设制作gif动画放到csdn博客上
  6. h5开发app之在线生成二维码
  7. Java通过jxl解析Excel文件入库,及日期格式处理方式 (附源代码)
  8. C# 文件的操作
  9. SpringMVC源码情操陶冶-ResourcesBeanDefinitionParser静态资源解析器
  10. (NO.00005)iOS实现炸弹人游戏(二):素材选择的取舍
  11. C# 基于密码的身份验证报错问题System.Net.NetworkCredential
  12. Windows【端口被占用,杀死想啥的端口】
  13. dao层、service和action的运用和区别
  14. 区间DP经典 石子合并
  15. Linux运维工程师应具备哪些技能?
  16. Spring,SpringMvc配置常见的坑,注解的使用注意事项,applicationContext.xml和spring.mvc.xml配置注意事项,spring中的事务失效,事务不回滚原因
  17. 【oracle常见错误】oracle监听程序配置/“ORA-12541: TNS: 无监听程序”
  18. ansible 问题
  19. 11-Python3从入门到实战—基础之生成器和迭代器
  20. 洛谷P2257 YY的GCD

热门文章

  1. tftp的安装及配置
  2. 自定义QT窗口部件外观之QStyle
  3. c# log4net 配置使用
  4. 【canvas】基础练习一 图形
  5. Hive 学习之路(五)—— Hive 分区表和分桶表
  6. Sqoop 简介与安装
  7. spring 5.x 系列第2篇 —— springmvc基础 (代码配置方式)
  8. JS工具整理
  9. laravel-admin(自定义表单视图)
  10. 建设DevOps统一运维监控平台,全面的系统监控 Zabbix VS Nagios VS Open-Falcon OR Prometheus