Spark—RDD编程常用转换算子代码实例

Spark rdd 常用 Transformation 实例:

1、def map[U: ClassTag](f: T => U): RDD[U]   将函数应用于RDD的每一元素,并返回一个新的RDD

package top.ruandb
import org.apache.spark.{SparkConf, SparkContext}
object RddTest extends App{
val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//map
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=>print(e+","))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.map(_*10)
sourceMap.collect().foreach(e=>print(e+","))//10 20 30 40 50 60 70 80 90 100
}

2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD

//filter
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=>print(e+" "))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.filter(_.<(5))
sourceMap.collect().foreach(e=>print(e+" "))//1 2 3 4

3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]   将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。

//flatMap
var source = sc.parallelize(1 to 5)
source.collect().foreach(e=>print(e+" "))//1 2 3 4 5
var sourceMap = source.flatMap(x=>(1 to x))
sourceMap.collect().foreach(e=>print(e+" "))//1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]    将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。

package top.ruandb
import org.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//mapPartitions
var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
source.collect().foreach(e => print(e + " "))//(lucy,female) (jack,male) (jams,male)
var sourceMap = source.mapPartitions(partitionsFun)
sourceMap.collect().foreach(e => print(e + " ")) //jams jack
}
def partitionsFun(iter:Iterator[(String,String)]): Iterator[String] ={
var males = List[String]()
while(iter.hasNext){
val next = iter.next()
next match {
case (_,"male") => males = next._1::males
case _ =>
}
}
return males.iterator
}
}

5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]  将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。

package top.ruandb
import org.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//mapPartitionsWithIndex
var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
source.collect().foreach(e => print(e + " "))//(lucy,female) (jack,male) (jams,male)
var sourceMap = source.mapPartitionsWithIndex(partitionsFunWithIndex)
sourceMap.collect().foreach(e => print(e + " ")) //[1]jams [1]jack
}
def partitionsFunWithIndex(index:Int,iter:Iterator[(String,String)]): Iterator[String] ={
var males = List[String]()
while(iter.hasNext){
val next = iter.next()
next match {
case (_,"male") => males="["+index+"]"+next._1 :: males
case _ =>
}
}
males.iterator
}
}

6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。

package top.ruandb
import org.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//sample
var source = sc.parallelize(1 to 10)
source.collect().foreach(e => print(e + " "))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.sample(true,0.4,2)
sourceMap.collect().foreach(e => print(e + " ")) //1 2 2
}
}

7、def union(other: RDD[T]): RDD[T]  将两个RDD中的元素进行合并,返回一个新的RDD

//union
var source = sc.parallelize(1 to 3)
source.collect().foreach(e => print(e + " "))//1 2 3
var rdd = sc.parallelize(6 to 9)
var sourceMap = source.union(rdd)
sourceMap.collect().foreach(e => print(e + " "))//1 2 3 6 7 8 9

8、def intersection(other: RDD[T]): RDD[T]  将两个RDD做交集,返回一个新的RDD

//intersection
var source = sc.parallelize(1 to 8)
source.collect().foreach(e => print(e + " "))//1 2 3 4 5 6 7 8
var rdd = sc.parallelize(6 to 9)
var sourceMap = source.intersection(rdd)
sourceMap.collect().foreach(e => print(e + " "))//6 8 7

9、def distinct(): RDD[T]  将当前RDD进行去重后,返回一个新的RDD

//distinct
var source = sc.parallelize(List(1,1,2,2,3,3,4,4,5,5))
source.collect().foreach(e => print(e + " "))//1 1 2 2 3 3 4 4 5 5
var sourceMap = source.distinct()
sourceMap.collect().foreach(e => print(e + " "))//4 2 1 3 5

10、def partitionBy(partitioner: Partitioner): RDD[(K, V)]  根据设置的分区器重新将RDD进行分区,返回新的RDD

//partitionBy
var source = sc.parallelize(List((1,"111"),(2,"222"),(3,"333"),(4,"444")),4)
source.collect().foreach(e => print(e + " "))
print("分区数:"+source.partitions.size)//(1,111) (2,222) (3,333) (4,444) 分区数:4
var sourceMap = source.partitionBy(new HashPartitioner(2))
sourceMap.collect().foreach(e => print(e + " "))
print("分区数:"+sourceMap.partitions.size)//(2,222) (4,444) (1,111) (3,333) 分区数:2

11、def reduceByKey(func: (V, V) => V): RDD[(K, V)]   根据Key值将相同Key的元组的值用func进行计算,返回新的RDD

//reduceByKey
var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
source.collect().foreach(e => print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
var sourceMap = source.reduceByKey((x,y)=>x+y)
sourceMap.collect().foreach(e => print(e + " "))//(hello,2) (world,2)

12、def groupByKey(): RDD[(K, Iterable[V])]   将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD

//groupByKey
var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
source.collect().foreach(e => print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
var sourceMap = source.groupByKey()
sourceMap.collect().foreach(e => print(e + " "))//(hello,CompactBuffer(1, 1)) (world,CompactBuffer(1, 1))

13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]   根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。

package top.ruandb
import org.apache.spark.{ SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//combineByKey 计算平均成绩
var scores = Array(("lucy", 89), ("jack", 77), ("lucy", 100), ("james", 65), ("jack", 99),
("james", 44))
var input = sc.parallelize(scores);
input.collect().foreach(e => print(e + " "))
//(lucy,89) (jack,77) (lucy,100) (james,65) (jack,99) (james,44)
var output = input.combineByKey((v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
output.collect().foreach(e => print(e + " "))//(james,(109,2)) (jack,(176,2)) (lucy,(189,2))
var result = output.map{case (key,value) => (key,value._1/value._2.toDouble)}
result.collect().foreach(e => print(e + " "))//(james,54.5) (jack,88.0) (lucy,94.5)
}
}

14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,

combOp: (U, U) => U): RDD[(K, U)]   通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。

最新文章

  1. 变通实现微服务的per request以提高IO效率(三)
  2. HTML5播放器实例
  3. WCF-复合类型使用;传输图片
  4. COJ978 WZJ的数据结构(负二十二)
  5. MongoDB Map Reduce
  6. UVa 489,紫书P79,刽子手游戏
  7. javascript 简繁转换
  8. golang语言部分保留字的举例
  9. OpenStack优先
  10. HDU 1312 Red and Black (DFS)
  11. 宠物收养场 Treap
  12. 第一个ServiceStack服务框架
  13. 初识Haskell 五:自定义数据类型和类型类
  14. [原创] debian 9.3 搭建Jira+Confluence+Bitbucket+crowd+seafile (零) 修改端口的问题
  15. Confluence 6 Oracle 驱动输入你的数据库细节
  16. logstash5.5 数据采入elasticsearch5.5(基于x-pack)
  17. docker使用dockerfile 构建redis镜像
  18. bootstrapTable 学习使用
  19. thinkphp 5 wherein
  20. java.lang.CharSequence cannot be resolved

热门文章

  1. YOLOv4 资源环境配置和测试样例效果
  2. 在H5页面播放m3u8音频文件
  3. 深度解密:Java与线程的关系
  4. LCD1602液晶显示模块的单片机驱动深入详解之硬件篇
  5. jquery动画(控制动画隐藏、显示时间轴)
  6. excel替换函数substitute
  7. 通过Cloudflare API进行CDN刷新
  8. 从零开始学前端,React框架背后的核心机制和原理JSX
  9. 详解C++中继承的基本内容
  10. Jenkins 凭证 Devops 的粘合剂