RDD操作分为转换操作和行动操作。

对于RDD而言,每一次的转化操作都会产生不同的RDD,供一个操作使用。

我们每次转换得到的RDD是惰性求值的

也就是说,整个转换过程并不是会真正的去计算,而是只记录了转换的轨迹。

当遇到行动操作的时候,才会发生真正的计算,从DAG图的源头开始进行“从头到尾”的计算。

常见的操作

操作类型

函数名

作用

转化操作

map()

参数是函数,函数应用于RDD每一个元素,返回值是新的RDD

flatMap()

参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD

filter()

参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD

distinct()

没有参数,将RDD里的元素进行去重操作

union()

参数是RDD,生成包含两个RDD所有元素的新RDD

intersection()

参数是RDD,求出两个RDD的共同元素

subtract()

参数是RDD,将原RDD里和参数RDD里相同的元素去掉

cartesian()

参数是RDD,求两个RDD的笛卡儿积

行动操作

collect()

返回RDD所有元素

count()

RDD里元素个数

countByValue()

各元素在RDD中出现次数

reduce()

并行整合所有RDD数据,例如求和操作

fold(0)(func)

和reduce功能一样,不过fold带有初始值

aggregate(0)(seqOp,combop)

和reduce功能一样,但是返回的RDD数据类型和原RDD不一样

foreach(func)

对RDD每个元素都是使用特定函数

除此之外我们还用到过的转换操作还有

1.groupByKey():应用于(K,V)键值对的数据集,返回一个新的(K,Iterable)形式的数据集

2.reduceByKey(func):应用于(K,V)键值对的数据集,返回一个新的(K,V)形式的数据集

其中每个值是将每个Key传入到func中进行聚合。

除此之外我们还用到过的行动操作还有

1.first():返回数据集的第一个元素

2.take(n):以数组形式返回数据集的前n个元素。

示例

转化操作

val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
println("======map操作======")
println(rddInt.map(x => x + 1).collect().mkString(","))
println("======map操作======") /* filter操作 */
println("======filter操作======")
println(rddInt.filter(x => x > 4).collect().mkString(","))
println("======filter操作======") /* flatMap操作 */
println("======flatMap操作======")
println(rddFile.flatMap { x => x.split(",") }.first())
println("======flatMap操作======") /* distinct去重操作 */
println("======distinct去重======")
println(rddInt.distinct().collect().mkString(","))
println(rddStr.distinct().collect().mkString(","))
println("======distinct去重======") /* union操作 */
println("======union操作======")
println(rdd01.union(rdd02).collect().mkString(","))
println("======union操作======") /* intersection操作 */
println("======intersection操作======")
println(rdd01.intersection(rdd02).collect().mkString(","))
println("======intersection操作======") /* subtract操作 */
println("======subtract操作======")
println(rdd01.subtract(rdd02).collect().mkString(","))
println("======subtract操作======") /* cartesian操作 */
println("======cartesian操作======")
println(rdd01.cartesian(rdd02).collect().mkString(","))
println("======cartesian操作======")

行动操作

val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)

  

/* count操作 */
println("======count操作======")
println(rddInt.count())
println("======count操作======") /* countByValue操作 */
println("======countByValue操作======")
println(rddInt.countByValue())
println("======countByValue操作======") /* reduce操作 */
println("======countByValue操作======")
println(rddInt.reduce((x, y) => x + y))
println("======countByValue操作======") /* fold操作 */
println("======fold操作======")
println(rddInt.fold(0)((x, y) => x + y))
println("======fold操作======") /* aggregate操作 */
println("======aggregate操作======")
val res: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => (x._1 + x._2, y),
(x, y) => (x._1 + x._2, y._1 + y._2))
println(res._1 + "," + res._2)
println("======aggregate操作======") /* foreach操作 */
println("======foeach操作======")
println(rddStr.foreach { x => println(x) })
println("======foeach操作======")

最新文章

  1. [安卓]Android窗口、视图、布局
  2. linux笔记:RPM软件包管理-rpm命令管理
  3. hdu 2054
  4. Struts2 中的值栈的理解
  5. C#操作Excel基本操作
  6. UVa 1347 (双线程DP) Tour
  7. FZU 2129 子序列个数 (动态规划)
  8. 快速访问WCF服务--ServiceModel 元数据实用工具 (Svcutil.exe)
  9. 安装solaris_11.2与windows双系统(VM10模拟实现)(二)
  10. vmware虚拟机上linux操作系统进行tty1~tty6切换方法和具体步骤
  11. leetcode先刷_Remove Duplicates from Sorted List II
  12. 《java入门第一季》之面向对象(方法重写问题)
  13. EIGRP 基础实验
  14. 【安富莱TCPnet网络教程】HTTP通信实例
  15. Linux系统基本结构——摘自《循序渐进linux》
  16. [再寄小读者之数学篇](2014-06-21 Beal-Kaot-Majda type logarithmic Sobolev inequality)
  17. ArcGIS 10——版本编辑流程
  18. 【hbuilder】如何根据Geolocation获得的坐标获取所在城市?
  19. vs 默认的INC和LIB
  20. ubuntu14.04开启root用户 设置root密码 配置国内镜像源 设置分辨率

热门文章

  1. Deep Learning -- 数据增强
  2. C#超级方便的ExpandoObject类别
  3. idea导入项目出现Unable to import maven project: See logs for details提示(转载)
  4. PyMySQL防止SQL注入
  5. Remote System Upgrade With Cyclone III Devices
  6. 剑指offer(第2版)刷题 Python版汇总
  7. CSS 之怀疑自己的审美 2 (Day50)
  8. Python框架之Tornado(请求)
  9. Wannafly交流赛1_B_硬币【数学】
  10. idea中git合并切换分支等操作