1.map

一条一条读取

def map(): Unit ={
val list = List("张无忌", "赵敏", "周芷若")
val listRDD = sc.parallelize(list)
val nameRDD = listRDD.map(name => "Hello " + name)
nameRDD.foreach(name => println(name))
}

2.flatMap

扁平化

def flatMap(): Unit ={
val list = List("张无忌 赵敏","宋青书 周芷若")
val listRDD = sc.parallelize(list) val nameRDD = listRDD.flatMap(line => line.split(" ")).map(name => "Hello " + name)
nameRDD.foreach(name => println(name))
}

3.mapPartitions

一次读取一个分区数据

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 2, 3, 4, 5, 6)
val rdd = spark.parallelize(list, 2)
rdd.foreach(println)
val rdd2 = rdd.mapPartitions(iterator => {
val newList = new ListBuffer[String]
while (iterator.hasNext) {
newList.append("hello" + iterator.next())
}
newList.toIterator
}) rdd2.foreach(name => println(name))
} }

4.mapPartitionsWithIndex

一次读取一个分区数据,并且知道是哪个分区的

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 2, 3, 4, 5, 6)
val rdd = spark.parallelize(list, 2)
val rdd2 = rdd.mapPartitionsWithIndex((index, iterator) => {
val newList = new ListBuffer[String]
while (iterator.hasNext) {
newList.append(index + "_" + iterator.next())
}
newList.toIterator
}) rdd2.foreach(name => println(name))
} }

5.reduce

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 2, 3, 4, 5, 6)
val rdd = spark.parallelize(list)
val result = rdd.reduce((x, y) => x + y)
println(result)
} }

6.reduceBykey

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(("武当", 99), ("少林", 97), ("武当", 89), ("少林", 77))
val rdd = spark.parallelize(list)
val rdd2 = rdd.reduceByKey(_ + _)
rdd2.foreach(tuple => println(tuple._1 + ":" + tuple._2))
}
}

7.union

合并,但不去重

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List(1,2,3,4)
val list2 = List(3,4,5,6)
val rdd1 = spark.parallelize(list1)
val rdd2 = spark.parallelize(list2)
rdd1.union(rdd2).foreach(println)
}
}

8.join

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List((1, "东方不败"), (2, "令狐冲"), (3, "林平之"))
val list2 = List((1, 99), (2, 98), (3, 97))
val rdd1 = spark.parallelize(list1)
val rdd2 = spark.parallelize(list2)
val rdd3 = rdd1.join(rdd2)
rdd3.foreach(tuple => {
val id = tuple._1
val new_tuple = tuple._2
val name = new_tuple._1
val score = new_tuple._2
println("学号:" + id + " 姓名:" + name + " 成绩:" + score)
})
}
}

9.groupbyKey

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))
val rdd1 = spark.parallelize(list)
val rdd2 = rdd1.groupByKey()
rdd2.foreach(t => {
val menpai = t._1
val iterator = t._2.iterator
var people = ""
while (iterator.hasNext) people = people + iterator.next + " "
println("门派:" + menpai + "人员:" + people)
})
}
}

10.cartesian

笛卡尔积

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List("A", "B")
val list2 = List(1, 2, 3)
val list1RDD = spark.parallelize(list1)
val list2RDD = spark.parallelize(list2)
list1RDD.cartesian(list2RDD).foreach(t => println(t._1 + "->" + t._2))
}
}

11.filter

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,2,3,4,5,6,7,8,9,10)
val listRDD = spark.parallelize(list)
listRDD.filter(num => num % 2 ==0).foreach(print(_))
}
}

12.distinct

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,1,2,2,3,3,4,5)
val rdd = spark.parallelize(list)
rdd.distinct().foreach(println)
}
}

13.intersection

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List(1,2,3,4)
val list2 = List(3,4,5,6)
val list1RDD = spark.parallelize(list1)
val list2RDD = spark.parallelize(list2)
list1RDD.intersection(list2RDD).foreach(println(_))
}
}

14.coalesce

分区有多-->少

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,2,3,4,5)
spark.parallelize(list,3).coalesce(1).foreach(println(_))
}
}

15.repartition

进行重分区

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,2,3,4)
val listRDD = spark.parallelize(list,1)
listRDD.repartition(2).foreach(println(_))
}
}

16.repartitionAndSortWithinPartitions

在给定的partitioner内部进行排序,性能比repartition要高。

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 4, 55, 66, 33, 48, 23)
val listRDD = spark.parallelize(list, 1)
listRDD.map(num => (num, num))
.repartitionAndSortWithinPartitions(new HashPartitioner(2))
.mapPartitionsWithIndex((index, iterator) => {
val listBuffer: ListBuffer[String] = new ListBuffer
while (iterator.hasNext) {
listBuffer.append(index + "_" + iterator.next())
}
listBuffer.iterator
}, false)
.foreach(println(_))
}
}

17.cogroup

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List((1, "www"), (2, "bbs"))
val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very"))
val list3 = List((1, "com"), (2, "com"), (3, "good")) val list1RDD = spark.parallelize(list1)
val list2RDD = spark.parallelize(list2)
val list3RDD = spark.parallelize(list3) list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple =>
println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3))
}
}

18.sortByKey

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List((99, "张三丰"), (96, "东方不败"), (66, "林平之"), (98, "聂风"))
spark.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1))
}
}

19.aggregateByKey

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List("you,jump", "i,jump")
spark.parallelize(list)
.flatMap(_.split(","))
.map((_, 1))
.aggregateByKey(0)(_ + _, _ + _)
.foreach(tuple => println(tuple._1 + "->" + tuple._2))
}
}
apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))
val rdd1 = spark.parallelize(list)
val rdd2 = rdd1.groupByKey()
rdd2.foreach(t => {
val menpai = t._1
val iterator = t._2.iterator
var people = ""
while (iterator.hasNext) people = people + iterator.next + " "
println("门派:" + menpai + "人员:" + people)
})
}
}

最新文章

  1. IOS开发之Bug--使用xib的自动布局和代码中修改遇到的bug
  2. miterLimit和lineJoin属性
  3. JS基础知识
  4. C# List结果集排序
  5. python中isort的使用
  6. hdu 1024 Max Sum Plus Plus
  7. redis在centOS的安装
  8. MyEclipse简单设置
  9. 将EXCEL中的列拼接成SQL insert插入语句
  10. Oracle EBS-SQL (BOM-3):检查期间新增Bom数量.sql
  11. CF 316C2(Tidying Up-二分图最大边权)
  12. [HMLY]9.深入浅出-iOS Reactive Cocoa的常见用法
  13. 简单介绍移动端CSS3单位rem的用法
  14. C++ 窗口可改风格
  15. Redis Sentinel 机制与用法(二)
  16. 2456: mode
  17. 来腾讯云开发者实验室 学习.NET
  18. C# DataTable,DataSet,IList,IEnumerable 互转扩展属性
  19. Java集合-ArrayList源码解析-JDK1.8
  20. Opencv-python画图基础知识

热门文章

  1. ☆ [WC2006] 水管局长 「LCT动态维护最小生成树」
  2. jenkins系列之jenkins job
  3. mac安装postman
  4. URL和URI以及两者的区别和联系
  5. Servlet中转发和重定向的路径问题【转】
  6. 深入学习c++(虚函数遇到析构函数就退化了)
  7. [数学笔记Mathematical Notes]2-一个带对数的积分不等式
  8. Java部分概念理解
  9. LINUX常用操作快捷方式
  10. library 显示所有的数据