RDD的算子分为两类,是 Trans formation(Lazy),一类是 Action(触发任务执行
RDD不存在真正要计算的数据,而是记录了RDD的转换关系(调用了什么方法,传入什么函数)

RDD的 Trans formation的特点
1. lazy
2.生成新的RDD

package cn.rzlee.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} object TransformationOperation {
def main(args: Array[String]): Unit = { //map()
//filter()
//flatMap()
// groupByKey()
//reduceByKey()
//sortByKey()
join()
} // 将集合中每个元素乘以2
def map(){
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf) val numbers = Array(1,2,3,4,5)
val numberRDD: RDD[Int] = sc.parallelize(numbers,1)
numberRDD.foreach(num=>println(num)) } // 过滤出集合中的偶数
def filter(): Unit ={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf) val numbers = Array(1,2,3,4,5)
val numberRDD: RDD[Int] = sc.parallelize(numbers,1)
val evenNumbersRdd = numberRDD.filter(num=>num%2==0)
evenNumbersRdd.foreach(num=>println(num))
} // 将行拆分为单词
def flatMap(): Unit ={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf) val lineArray = Array("hello you", "just do it", "go go go")
val lines = sc.parallelize(lineArray, 1)
val words: RDD[String] = lines.flatMap(line=>line.split(" "))
words.foreach(word=>println(word))
} // 将每个班级的成绩进行分组
def groupByKey(): Unit ={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf)
val scoresList = Array(Tuple2("class1", 50), Tuple2("class1", 95), Tuple2("class2", 60), Tuple2("class2", 88))
val scores: RDD[(String, Int)] = sc.parallelize(scoresList, 1)
val groupedScoreds = scores.groupByKey()
groupedScoreds.foreach(scored=>{
println(scored._1)
scored._2.foreach(singleScore=>println(singleScore))
println("=====================================")
})
} // 统计每个班级的总分
def reduceByKey(): Unit ={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf) val scoresList = Array(Tuple2("class1", 50), Tuple2("class1", 95), Tuple2("class2", 60), Tuple2("class2", 88))
val scores: RDD[(String, Int)] = sc.parallelize(scoresList, 1)
val totalScores: RDD[(String, Int)] = scores.reduceByKey(_+_)
totalScores.foreach(totalScore=>println(totalScore._1 +" : " + totalScore._2)) } //将学生分数进行排序
def sortByKey(): Unit ={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2(90,"leo"), Tuple2(99, "kent"), Tuple2(80,"Jeo"), Tuple2(91,"Ben"), Tuple2(96,"Sam"))
val scores: RDD[( Int,String)] = sc.parallelize(scoreList, 1)
val sortedScores = scores.sortByKey(false)
sortedScores.foreach(student=>println(student._2 +" : " + student._1))
} // 打印每个学生的成绩
def join(): Unit ={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf) val studentsList = Array(Tuple2(1,"leo"), Tuple2(2, "Sam"), Tuple2(3, "kevin"))
val scoresList = Array(Tuple2(1,60), Tuple2(2,70), Tuple2(3,80)) val students: RDD[(Int, String)] = sc.parallelize(studentsList,1)
val scores: RDD[(Int, Int)] = sc.parallelize(scoresList,1)
val studentScores: RDD[(Int, (String, Int))] = students.join(scores)
studentScores.foreach(studentScore=>{
println("studentid: "+studentScore._1)
println("studentNmae:"+studentScore._2._1)
println("studentScore: "+ studentScore._2._2)
println("###################################################")
})
}
// 打印每个学生的成绩
// cogroup相当于full join
def cogroup(): Unit ={
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
val sc = new SparkContext(conf) val studentsList = Array(Tuple2(1,"leo"), Tuple2(2, "Sam"), Tuple2(3, "kevin"))
val scoresList = Array(Tuple2(1,60), Tuple2(2,70), Tuple2(3,80)) val students: RDD[(Int, String)] = sc.parallelize(studentsList,1)
val scores: RDD[(Int, Int)] = sc.parallelize(scoresList,1) val studentScores: RDD[(Int, (Iterable[String], Iterable[Int]))] = students.cogroup(scores)
studentScores.foreach(studentScore =>{
println("studentid: " + studentScore._1)
println("studentname: "+ studentScore._2._1)
println("studentscore: "+ studentScore._2._2) })
}

#union求并集,注意类型要一致

val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect

#intersection求交集

val rdd9 = rdd6.intersection(rdd7)

#join(连接)  注意按照key相join

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2))) val rdd3 = rdd1.join(rdd2)
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd3 = rdd1.rightOuterJoin(rdd2)

#cogroup 有点像全外连接

    // cogroup 

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

val rdd3 = rdd1.cogroup(rdd2)

println(rdd3.collect().toBuffer)

#cartesian笛卡尔积

val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2)

最新文章

  1. .Net批量插入数据到SQLServer数据库,System.Data.SqlClient.SqlBulkCopy类批量插入大数据到数据库
  2. 如何站在使用者的角度来设计SDK-微信公众号开发SDK(消息处理)设计之抛砖引玉
  3. 让Xcode 8.x能够调试iOS 7.x真机
  4. android studio 换护眼的颜色步骤
  5. MSP430设置串口波特率的方法
  6. 转载Agile Development 敏捷软件开发介绍
  7. AndroidStudio快捷键汇总
  8. 关于apple watch(苹果表)
  9. -exec和|xargs
  10. 计算字符串和文件的MD5值
  11. kick_ball
  12. linux的学习之路--(五)bash及其特性
  13. php支持大文件上传
  14. JPA核心类与使用
  15. Android View体系(四)从源码解析Scroller
  16. Comparer Under Centos 7
  17. 【SQL】group by 和order by 的区别。
  18. centos 7 免密登录
  19. 咏南中间件开始支持redis client接口调用
  20. Java-----隐藏手机号中间四位,身份证号码中间几位

热门文章

  1. Unix编程第7章 进程环境
  2. Carthage:去中心化的Cocoa依赖管理器
  3. MIC中的数据传输
  4. jsonp 小结
  5. 【文献阅读】Deep Residual Learning for Image Recognition--CVPR--2016
  6. Oracle中NVL、NVL2、NULLIF 三个函数的区别?
  7. DLX精确覆盖与重复覆盖模板题
  8. thinkPHP5.0的添加(C操作)
  9. 使用UIImageView展现来自网络的图片
  10. 软件测试人员需要精通的开发语言(1)--- VBScript