spark算子
2024-08-21 15:11:58
1.map
一条一条读取
def map(): Unit ={
val list = List("张无忌", "赵敏", "周芷若")
val listRDD = sc.parallelize(list)
val nameRDD = listRDD.map(name => "Hello " + name)
nameRDD.foreach(name => println(name))
}
2.flatMap
扁平化
def flatMap(): Unit ={
val list = List("张无忌 赵敏","宋青书 周芷若")
val listRDD = sc.parallelize(list) val nameRDD = listRDD.flatMap(line => line.split(" ")).map(name => "Hello " + name)
nameRDD.foreach(name => println(name))
}
3.mapPartitions
一次读取一个分区数据
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 2, 3, 4, 5, 6)
val rdd = spark.parallelize(list, 2)
rdd.foreach(println)
val rdd2 = rdd.mapPartitions(iterator => {
val newList = new ListBuffer[String]
while (iterator.hasNext) {
newList.append("hello" + iterator.next())
}
newList.toIterator
}) rdd2.foreach(name => println(name))
} }
4.mapPartitionsWithIndex
一次读取一个分区数据,并且知道是哪个分区的
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 2, 3, 4, 5, 6)
val rdd = spark.parallelize(list, 2)
val rdd2 = rdd.mapPartitionsWithIndex((index, iterator) => {
val newList = new ListBuffer[String]
while (iterator.hasNext) {
newList.append(index + "_" + iterator.next())
}
newList.toIterator
}) rdd2.foreach(name => println(name))
} }
5.reduce
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 2, 3, 4, 5, 6)
val rdd = spark.parallelize(list)
val result = rdd.reduce((x, y) => x + y)
println(result)
} }
6.reduceBykey
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(("武当", 99), ("少林", 97), ("武当", 89), ("少林", 77))
val rdd = spark.parallelize(list)
val rdd2 = rdd.reduceByKey(_ + _)
rdd2.foreach(tuple => println(tuple._1 + ":" + tuple._2))
}
}
7.union
合并,但不去重
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List(1,2,3,4)
val list2 = List(3,4,5,6)
val rdd1 = spark.parallelize(list1)
val rdd2 = spark.parallelize(list2)
rdd1.union(rdd2).foreach(println)
}
}
8.join
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List((1, "东方不败"), (2, "令狐冲"), (3, "林平之"))
val list2 = List((1, 99), (2, 98), (3, 97))
val rdd1 = spark.parallelize(list1)
val rdd2 = spark.parallelize(list2)
val rdd3 = rdd1.join(rdd2)
rdd3.foreach(tuple => {
val id = tuple._1
val new_tuple = tuple._2
val name = new_tuple._1
val score = new_tuple._2
println("学号:" + id + " 姓名:" + name + " 成绩:" + score)
})
}
}
9.groupbyKey
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))
val rdd1 = spark.parallelize(list)
val rdd2 = rdd1.groupByKey()
rdd2.foreach(t => {
val menpai = t._1
val iterator = t._2.iterator
var people = ""
while (iterator.hasNext) people = people + iterator.next + " "
println("门派:" + menpai + "人员:" + people)
})
}
}
10.cartesian
笛卡尔积
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List("A", "B")
val list2 = List(1, 2, 3)
val list1RDD = spark.parallelize(list1)
val list2RDD = spark.parallelize(list2)
list1RDD.cartesian(list2RDD).foreach(t => println(t._1 + "->" + t._2))
}
}
11.filter
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,2,3,4,5,6,7,8,9,10)
val listRDD = spark.parallelize(list)
listRDD.filter(num => num % 2 ==0).foreach(print(_))
}
}
12.distinct
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,1,2,2,3,3,4,5)
val rdd = spark.parallelize(list)
rdd.distinct().foreach(println)
}
}
13.intersection
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List(1,2,3,4)
val list2 = List(3,4,5,6)
val list1RDD = spark.parallelize(list1)
val list2RDD = spark.parallelize(list2)
list1RDD.intersection(list2RDD).foreach(println(_))
}
}
14.coalesce
分区有多-->少
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,2,3,4,5)
spark.parallelize(list,3).coalesce(1).foreach(println(_))
}
}
15.repartition
进行重分区
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1,2,3,4)
val listRDD = spark.parallelize(list,1)
listRDD.repartition(2).foreach(println(_))
}
}
16.repartitionAndSortWithinPartitions
在给定的partitioner内部进行排序,性能比repartition要高。
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(1, 4, 55, 66, 33, 48, 23)
val listRDD = spark.parallelize(list, 1)
listRDD.map(num => (num, num))
.repartitionAndSortWithinPartitions(new HashPartitioner(2))
.mapPartitionsWithIndex((index, iterator) => {
val listBuffer: ListBuffer[String] = new ListBuffer
while (iterator.hasNext) {
listBuffer.append(index + "_" + iterator.next())
}
listBuffer.iterator
}, false)
.foreach(println(_))
}
}
17.cogroup
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list1 = List((1, "www"), (2, "bbs"))
val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very"))
val list3 = List((1, "com"), (2, "com"), (3, "good")) val list1RDD = spark.parallelize(list1)
val list2RDD = spark.parallelize(list2)
val list3RDD = spark.parallelize(list3) list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple =>
println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3))
}
}
18.sortByKey
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List((99, "张三丰"), (96, "东方不败"), (66, "林平之"), (98, "聂风"))
spark.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1))
}
}
19.aggregateByKey
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List("you,jump", "i,jump")
spark.parallelize(list)
.flatMap(_.split(","))
.map((_, 1))
.aggregateByKey(0)(_ + _, _ + _)
.foreach(tuple => println(tuple._1 + "->" + tuple._2))
}
}
apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Demo {
val conf = new SparkConf().setAppName("Demo").setMaster("local");
// val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val spark = new SparkContext(conf) def main(args: Array[String]): Unit = {
val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))
val rdd1 = spark.parallelize(list)
val rdd2 = rdd1.groupByKey()
rdd2.foreach(t => {
val menpai = t._1
val iterator = t._2.iterator
var people = ""
while (iterator.hasNext) people = people + iterator.next + " "
println("门派:" + menpai + "人员:" + people)
})
}
}
最新文章
- IOS开发之Bug--使用xib的自动布局和代码中修改遇到的bug
- miterLimit和lineJoin属性
- JS基础知识
- C# List结果集排序
- python中isort的使用
- hdu 1024 Max Sum Plus Plus
- redis在centOS的安装
- MyEclipse简单设置
- 将EXCEL中的列拼接成SQL insert插入语句
- Oracle EBS-SQL (BOM-3):检查期间新增Bom数量.sql
- CF 316C2(Tidying Up-二分图最大边权)
- [HMLY]9.深入浅出-iOS Reactive Cocoa的常见用法
- 简单介绍移动端CSS3单位rem的用法
- C++ 窗口可改风格
- Redis Sentinel 机制与用法(二)
- 2456: mode
- 来腾讯云开发者实验室 学习.NET
- C# DataTable,DataSet,IList,IEnumerable 互转扩展属性
- Java集合-ArrayList源码解析-JDK1.8
- Opencv-python画图基础知识