常见的transformation算子
RDD:RDD分区数,若从HDFS创建RDD,RDD的分区就是和文件块一一对应,若是集合并行化形式创建,RDD分区数可以指定,一般默认值是CPU的核数。
task:task数量就是和分区数量对应。
这个全:https://www.cnblogs.com/frankdeng/p/9301672.html
一、transformation算子:
(1)map(func):将函数应用于RDD中的每一个元素,将返回值构成新的RDD。输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。
rdd.map(x=>x+1)
如:{1,2,3,3} 结果为 {2,3,4,4}
hadoop fs -cat /tmp/lxw1234/1.txt
hello world
hello spark
hello hive //读取HDFS文件到RDD
scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
//使用map算子
scala> var mapresult = data.map(line => line.split("\\s+"))
mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
//结果
scala> mapresult.collect
res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
(2)flatMap(func):比map多一步合并操作,首先将数组元素进行映射,然后合并压平所有的数组。
//使用flatMap算子
scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
//结果
scala> flatmapresult.collect
res1: Array[String] = Array(hello, world, hello, spark, hello, hive)
参考博客:https://www.cnblogs.com/devin-ou/p/8028305.html
(3)mapPartitions(func):函数中传入的参数是迭代器,迭代器里面保存的是一个分区里面的数据。
/** * makeRDD方法的第一个参数代表的是RDD中的 元素 * 第二个参数:RDD的分区数 * rdd[Int] */ val rdd = sc.makeRDD(1 to 10,3) /** * mapPartitions这个算子遍历的单位是partition * 会将一个partition的数据量全部加载到一个集合里面 */ val mapPartitonsRDD = rdd.mapPartitions(iterator=>{ val list = new ListBuffer[Int]() //创建一个数据库连接 while(iterator.hasNext){ val num = iterator.next() list.+=(num+100) } //批量插入数据库 list.iterator }, false) /** * 想要执行,必须有action类的算子 * collect算子会将集群中计算的结果回收到Driver端,慎用 */ val resultArr = mapPartitonsRDD.collect() resultArr.foreach { println }
map和mapPartition的异同:
mapPartition function一次处理一个分区的数据,性能比较高;
map的function一次只处理一条数据。
如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
SparkSql或DataFrame默认会对程序进行mapPartition的优化。
参考博客:https://blog.csdn.net/wuxintdrh/article/details/80278479
(4)distinct:对RDD中的元素进行去重操作。
scala> data.flatMap(line => line.split("\\s+")).collect
res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark) scala> data.flatMap(line => line.split("\\s+")).distinct.collect
res62: Array[String] = Array(hive, hello, world, spark, hi)
(5)reduceByKey(func,[numTask]):找到相同的key,对其进行聚合,聚合的规则由func指定。
reduce任务的数量可以由numTask指定
goodsSaleRDD.reduceByKey((x,y) => x+y)
参考博客:https://www.jianshu.com/p/af175e66ce99
(6)groupByKey():对相同的key进行分组。
(7)aggregateByKey(zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U)
第一个参数代表着 初始值
第二个参数是中间聚合,在每个分区内部按照key执行聚合操作。这个分两步,第一步先将每个value和初始值作为函数参数进行计算,返回的结果作为新的kv对。然后在对结果再带入到函数中计算。
第三个参数是最终聚合,对中间聚合结果进行最终聚合。
例如:一个RDD有两个分区,
patition1:(1,1) (1,2) (2,1)
patition2:(2,3)(2,4)(1,7)
首先,在每个patition中将value和初始值三带入到seqFunc函数中,得到中间结果kv:
patition1:(1,3) (1,3) (2,3)
patition2:(2,3)(2,4)(1,7)
再将中间结果kv带入到seqFunc函数中,按照key进行聚合
patition1:(1,3)(2,3)
patition2:(2,4)(1,7)
最后,进行整体聚合,将上一步结果带入combFunc
(1,10)(2,7)
def seqFunc(a,b):
print "seqFunc:%s,%s" %(a,b)
return max(a,b) #取最大值
def combFunc(a,b):
print "combFunc:%s,%s" %(a ,b)
return a + b #累加起来
'''
aggregateByKey这个算子内部肯定有分组
'''
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
参考博客:https://blog.csdn.net/qq_35440040/article/details/82691794 这个写的挺乱,但有地方可以参考
(8)combineByKey ( createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C,C) =>C ) :
主要分为三步,第一步,对value进行初始化处理;第二步,在分区内部对(key,value)进行处理,第三步,所有分区间对(key,value)进行处理。
https://www.jianshu.com/p/b77a6294f31c
参考博客:https://www.jianshu.com/p/b77a6294f31c
(9)sortBy():排序操作
最新文章
- 在Ubuntu X64上编译Hadoop
- 集DDD,TDD,SOLID,MVVM,DI,EF,Angularjs等于一身的.NET(C#)开源可扩展电商系统–Virto Commerce
- 【Android】Handler使用入门
- JDBC驱动的四种类型
- 传说中的Markov";不过如此”
- 网页JavaScript
- ASP.NET 动态编译、预编译和 WebDeployment 项目(转)
- 选址问题lingo求解
- Android学习总结——Activity状态保存和恢复
- 64位系统/32位系统下/8位CPU的数据宽度
- 深入struts2(三)---工作机制和运行流程图
- RMAN-06217: not connected to auxiliary database with a net service name
- php7连接mysql测试代码
- maven打包如何跳过测试
- vi/vim 三种模式的操作
- 用好lua+unity,让性能飞起来——luajit集成篇/平台相关篇
- git push 远程新分支
- 【转载】实用VC++6.0插件
- ajax参考增删改查
- python自动化运维之路06
热门文章
- 使用docker-compose快速搭建gitlab
- 1139 First Contact PAT (Advanced Level)
- Educational Codeforces Round 75 (Rated for Div. 2)
- 属性动画 补间动画 帧动画 基本使用案例 MD
- springcolud 的学习(四)服务治理. Eureka
- bat命令复制文件
- AWS--Lamdba
- Python进阶----进程间数据隔离, join阻塞等待, 进程属性, 僵尸进程和孤儿进程, 守护进程
- Linux:检查当前运行级别的五种方法
- 浅谈JS中 var let const 变量声明