【spark】RDD操作
RDD操作分为转换操作和行动操作。
对于RDD而言,每一次的转化操作都会产生不同的RDD,供一个操作使用。
我们每次转换得到的RDD是惰性求值的
也就是说,整个转换过程并不是会真正的去计算,而是只记录了转换的轨迹。
当遇到行动操作的时候,才会发生真正的计算,从DAG图的源头开始进行“从头到尾”的计算。
常见的操作
操作类型 |
函数名 |
作用 |
转化操作 |
map() |
参数是函数,函数应用于RDD每一个元素,返回值是新的RDD |
flatMap() |
参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD |
|
filter() |
参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD |
|
distinct() |
没有参数,将RDD里的元素进行去重操作 |
|
union() |
参数是RDD,生成包含两个RDD所有元素的新RDD |
|
intersection() |
参数是RDD,求出两个RDD的共同元素 |
|
subtract() |
参数是RDD,将原RDD里和参数RDD里相同的元素去掉 |
|
cartesian() |
参数是RDD,求两个RDD的笛卡儿积 |
|
行动操作 |
collect() |
返回RDD所有元素 |
count() |
RDD里元素个数 |
|
countByValue() |
各元素在RDD中出现次数 |
|
reduce() |
并行整合所有RDD数据,例如求和操作 |
|
fold(0)(func) |
和reduce功能一样,不过fold带有初始值 |
|
aggregate(0)(seqOp,combop) |
和reduce功能一样,但是返回的RDD数据类型和原RDD不一样 |
|
foreach(func) |
对RDD每个元素都是使用特定函数 |
除此之外我们还用到过的转换操作还有
1.groupByKey():应用于(K,V)键值对的数据集,返回一个新的(K,Iterable)形式的数据集
2.reduceByKey(func):应用于(K,V)键值对的数据集,返回一个新的(K,V)形式的数据集
其中每个值是将每个Key传入到func中进行聚合。
除此之外我们还用到过的行动操作还有
1.first():返回数据集的第一个元素
2.take(n):以数组形式返回数据集的前n个元素。
示例
转化操作
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
println("======map操作======")
println(rddInt.map(x => x + 1).collect().mkString(","))
println("======map操作======") /* filter操作 */
println("======filter操作======")
println(rddInt.filter(x => x > 4).collect().mkString(","))
println("======filter操作======") /* flatMap操作 */
println("======flatMap操作======")
println(rddFile.flatMap { x => x.split(",") }.first())
println("======flatMap操作======") /* distinct去重操作 */
println("======distinct去重======")
println(rddInt.distinct().collect().mkString(","))
println(rddStr.distinct().collect().mkString(","))
println("======distinct去重======") /* union操作 */
println("======union操作======")
println(rdd01.union(rdd02).collect().mkString(","))
println("======union操作======") /* intersection操作 */
println("======intersection操作======")
println(rdd01.intersection(rdd02).collect().mkString(","))
println("======intersection操作======") /* subtract操作 */
println("======subtract操作======")
println(rdd01.subtract(rdd02).collect().mkString(","))
println("======subtract操作======") /* cartesian操作 */
println("======cartesian操作======")
println(rdd01.cartesian(rdd02).collect().mkString(","))
println("======cartesian操作======")
行动操作
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
/* count操作 */
println("======count操作======")
println(rddInt.count())
println("======count操作======") /* countByValue操作 */
println("======countByValue操作======")
println(rddInt.countByValue())
println("======countByValue操作======") /* reduce操作 */
println("======countByValue操作======")
println(rddInt.reduce((x, y) => x + y))
println("======countByValue操作======") /* fold操作 */
println("======fold操作======")
println(rddInt.fold(0)((x, y) => x + y))
println("======fold操作======") /* aggregate操作 */
println("======aggregate操作======")
val res: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => (x._1 + x._2, y),
(x, y) => (x._1 + x._2, y._1 + y._2))
println(res._1 + "," + res._2)
println("======aggregate操作======") /* foreach操作 */
println("======foeach操作======")
println(rddStr.foreach { x => println(x) })
println("======foeach操作======")
最新文章
- [安卓]Android窗口、视图、布局
- linux笔记:RPM软件包管理-rpm命令管理
- hdu 2054
- Struts2 中的值栈的理解
- C#操作Excel基本操作
- UVa 1347 (双线程DP) Tour
- FZU 2129 子序列个数 (动态规划)
- 快速访问WCF服务--ServiceModel 元数据实用工具 (Svcutil.exe)
- 安装solaris_11.2与windows双系统(VM10模拟实现)(二)
- vmware虚拟机上linux操作系统进行tty1~tty6切换方法和具体步骤
- leetcode先刷_Remove Duplicates from Sorted List II
- 《java入门第一季》之面向对象(方法重写问题)
- EIGRP 基础实验
- 【安富莱TCPnet网络教程】HTTP通信实例
- Linux系统基本结构——摘自《循序渐进linux》
- [再寄小读者之数学篇](2014-06-21 Beal-Kaot-Majda type logarithmic Sobolev inequality)
- ArcGIS 10——版本编辑流程
- 【hbuilder】如何根据Geolocation获得的坐标获取所在城市?
- vs 默认的INC和LIB
- ubuntu14.04开启root用户 设置root密码 配置国内镜像源 设置分辨率
热门文章
- Deep Learning -- 数据增强
- C#超级方便的ExpandoObject类别
- idea导入项目出现Unable to import maven project: See logs for details提示(转载)
- PyMySQL防止SQL注入
- Remote System Upgrade With Cyclone III Devices
- 剑指offer(第2版)刷题 Python版汇总
- CSS 之怀疑自己的审美 2 (Day50)
- Python框架之Tornado(请求)
- Wannafly交流赛1_B_硬币【数学】
- idea中git合并切换分支等操作