Spark Dataset DataFrame 操作

相关博文参考

sparkSQL实战详解

Spark-SQL之DataFrame操作大全

sparksql中dataframe的用法

import groovy.sql.DataSet
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Row, SparkSession} object sparkSession {
case class Person(name:String,age:BigInt)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[4]")
.appName("Spark SQL Example")
.getOrCreate()
//读取json文件,返回一个dataframe;
val df = spark.read.json("D:\\people.json")
import spark.implicits._
//查看DataFrame中的内容,默认显示20行;
df.show()
//----------------------dataframe和dataSet相互转换-----------------------------------
// val df:DataFrame = DataSet[Row] //dataframe和dataSet的关系;
val ds = df.as[Person]
val ds_1 = df.toJSON
val df_new = ds.toDF()
//----------------------------------------------------------------------------------
//打印DataFrame的Schema信息;可以理解为模型;
df.printSchema()
//查看DataFrame部分列中的内容,age
df.select("age").show()
//查看DataFrame部分列中的内容,age+1
df.select($"name",$"age"+1).show
//另外一种写法;查看age和name;
df.select(df("age"),df("name")).show()
//过滤age大于20的,显示10行;
df.filter($"age" > 20).show(10)
//统计年龄大于20的人数;
df.filter(df("age") > 20).count()
//按年龄进行分组,统计人数;
df.groupBy("name").count().show()
//collect方法会将df中的所有数据都获取到,并返回一个Row类型的Array对象
df.collect().foreach(println)
//和上面的类似,返回一个Row类型的List集合;
val list = df.collectAsList()
//获取指定字段的统计信息;
df.describe("name","age").show()
//跟sql语句的where条件一样;
df.where("age = 18 and name = 'jason'").show()
//根据某个字段筛选;
df.filter("name = 'jason'").show()
//获取指定的字段可以对字段做一些特殊的操作,可以写别名;
df.selectExpr("age","name as n").show()
//获取指定的字段,注意这个一次只能获取一个字段;
val age = df.col("age")
println(age)
//和上面的一样,也是获取某个字段;
val name = df.apply("name")
println(name)
//删除指定的字段;
df.drop("age").show()
//跟sql的limit一样,显示前几行;
df.limit(5).show()
//根据某个字段排序;
df.orderBy(df("age").desc).show()
//按照partition进行排序;
df.sortWithinPartitions("age").show()
//跟sql里面的一样,根据某个字段分组;
val groupby_name = df.groupBy("name").count().show()
//结合groupby做一些聚合操作;
df.groupBy("age").max().show()
//去重;
df.distinct().show()
//指定字段去重;
df.dropDuplicates("name").show()
//聚合操作;
df.agg("age"-> "max","age"-> "min").show()
//对结果叠加;
df.union(df).show()
//跟sql里面的join一样支持,left join,right join,inner join,这个操作非常的丰富,这里就不在一一列举了;
df.join(df,Seq("age"),"left").show()
//获取两个df中相同的数据,相当于inner join;
df.intersect(df).show()
//对指定字段重命名;
df.withColumnRenamed("name","name1").show()
//增加新的字段,默认显示为null
df.withColumn("name1",df("age")).show()
//前几天有人问我增加新的字段显示为0,怎么写?选择一列数值类型的乘以0就可以了;
df.withColumn("name2",df("age")*0).show()
}
}

一、Spark2 Dataset DataFrame空值null,NaN判断和处理

1.1 显示前10条数据

val data1 = data.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
data1.limit(10).show
+-------+------+---+------------+--------+-------------+---------+----------+------+
|affairs|gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+---+------------+--------+-------------+---------+----------+------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0| null| 27| null| no| 4| 14| 6| null|
| 0| null| 32| null| yes| 1| 12| 1| null|
| 0| null| 57| null| yes| 5| 18| 6| null|
| 0| null| 22| null| no| 2| 17| 6| null|
| 0| null| 32| null| no| 2| 17| 5| null|
| 0|female| 22| null| no| 2| 12| 1| null|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0|female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
+-------+------+---+------------+--------+-------------+---------+----------+------+

1.2 删除所有列的空值和NaN

val resNull=data1.na.drop()
resNull.limit(10).show()
+-------+------+---+------------+--------+-------------+---------+----------+------+
|affairs|gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+---+------------+--------+-------------+---------+----------+------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0|female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
| 0| male| 37| 15| yes| 2| 20| 7| 2|
| 0| male| 27| 4| yes| 4| 18| 6| 4|
| 0| male| 47| 15| yes| 5| 17| 6| 4|
| 0|female| 22| 1.5| no| 2| 17| 5| 4|
| 0|female| 27| 4| no| 4| 14| 5| 4|
| 0|female| 37| 15| yes| 1| 17| 5| 5|
+-------+------+---+------------+--------+-------------+---------+----------+------+

1.3 删除某列的空值和NaN

 //删除某列的空值和NaN
val res=data1.na.drop(Array("gender","yearsmarried"))

1.4 删除某列的非空且非NaN的低于10的

// 删除某列的非空且非NaN的低于10的
data1.na.drop(10,Array("gender","yearsmarried"))

1.5 填充所有空值的列

 //填充所有空值的列
val res123=data1.na.fill("wangxiao123")
res123.limit(10).show()
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+
|affairs| gender|age|yearsmarried|children|religiousness|education|occupation| rating|
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0|wangxiao123| 27| wangxiao123| no| 4| 14| 6|wangxiao123|
| 0|wangxiao123| 32| wangxiao123| yes| 1| 12| 1|wangxiao123|
| 0|wangxiao123| 57| wangxiao123| yes| 5| 18| 6|wangxiao123|
| 0|wangxiao123| 22| wangxiao123| no| 2| 17| 6|wangxiao123|
| 0|wangxiao123| 32| wangxiao123| no| 2| 17| 5|wangxiao123|
| 0| female| 22| wangxiao123| no| 2| 12| 1|wangxiao123|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0| female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+

1.6 对指定的列空值填充

 //对指定的列空值填充
val res2=data1.na.fill(value="wangxiao111",cols=Array("gender","yearsmarried") )
res2.limit(10).show()
+-------+-----------+---+------------+--------+-------------+---------+----------+------+
|affairs| gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+-----------+---+------------+--------+-------------+---------+----------+------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0|wangxiao111| 27| wangxiao111| no| 4| 14| 6| null|
| 0|wangxiao111| 32| wangxiao111| yes| 1| 12| 1| null|
| 0|wangxiao111| 57| wangxiao111| yes| 5| 18| 6| null|
| 0|wangxiao111| 22| wangxiao111| no| 2| 17| 6| null|
| 0|wangxiao111| 32| wangxiao111| no| 2| 17| 5| null|
| 0| female| 22| wangxiao111| no| 2| 12| 1| null|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0| female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
+-------+-----------+---+------------+--------+-------------+---------+----------+------+

1.7 查询空值列

 //查询空值列
data1.filter("gender is null").select("gender").limit(10).show
+------+
|gender|
+------+
| null|
| null|
| null|
| null|
| null|
+------+
 data1.filter( data1("gender").isNull ).select("gender").limit(10).show
+------+
|gender|
+------+
| null|
| null|
| null|
| null|
| null|
+------+

1.8 查询非空列

data1.filter("gender is not null").select("gender").limit(10).show
+------+
|gender|
+------+
| male|
|female|
| male|
|female|
| male|
| male|
| male|
| male|
|female|
|female|
+------+
data1.filter("gender<>''").select("gender").limit(10).show
+------+
|gender|
+------+
| male|
|female|
| male|
|female|
| male|
| male|
| male|
| male|
|female|
|female|
+------+

二、Dataset行列操作和执行计划

Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]

Dataset是“懒惰”的,只在执行行动操作时触发计算。本质上,数据集表示一个逻辑计划,该计划描述了产生数据所需的计算。当执行行动操作时,Spark的查询优化程序优化逻辑计划,并生成一个高效的并行和分布式物理计划。

2.1 常用包

import scala.math._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrameReader
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.DataFrameStatFunctions

2.2 创建SparkSession,并导入示例数据

val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._ val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
(0, "male", 37, 10, "no", 3, 18, 7, 4),
(0, "female", 27, 4, "no", 4, 14, 6, 4),
(0, "female", 32, 15, "yes", 1, 12, 1, 4),
(0, "male", 57, 15, "yes", 5, 18, 6, 5),
(0, "male", 22, 0.75, "no", 2, 17, 6, 3),
(0, "female", 32, 1.5, "no", 2, 17, 5, 5),
(0, "female", 22, 0.75, "no", 2, 12, 1, 3),
(0, "male", 57, 15, "yes", 2, 14, 4, 4),
(0, "female", 32, 15, "yes", 4, 16, 1, 2),
(0, "male", 22, 1.5, "no", 4, 14, 4, 5)) val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating") data.printSchema()
root
|-- affairs: double (nullable = false)
|-- gender: string (nullable = true)
|-- age: double (nullable = false)
|-- yearsmarried: double (nullable = false)
|-- children: string (nullable = true)
|-- religiousness: double (nullable = false)
|-- education: double (nullable = false)
|-- occupation: double (nullable = false)
|-- rating: double (nullable = false)

2.3 操作指定的列和行

// 在Spark-shell中展示,前n条记录
data.show(7)
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0| male|37.0| 10.0| no| 3.0| 18.0| 7.0| 4.0|
| 0.0|female|27.0| 4.0| no| 4.0| 14.0| 6.0| 4.0|
| 0.0|female|32.0| 15.0| yes| 1.0| 12.0| 1.0| 4.0|
| 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0|
| 0.0| male|22.0| 0.75| no| 2.0| 17.0| 6.0| 3.0|
| 0.0|female|32.0| 1.5| no| 2.0| 17.0| 5.0| 5.0|
| 0.0|female|22.0| 0.75| no| 2.0| 12.0| 1.0| 3.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
only showing top 7 rows // 取前n条记录
val data3=data.limit(5) // 过滤
data.filter("age>50 and gender=='male' ").show
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0|
| 0.0| male|57.0| 15.0| yes| 2.0| 14.0| 4.0| 4.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+ // 数据框的所有列 val columnArray=data.columns
columnArray: Array[String] = Array(affairs, gender, age, yearsmarried, children, religiousness, education, occupation, rating) // 查询某些列的数据
data.select("gender", "age", "yearsmarried", "children").show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
| male|37.0| 10.0| no|
|female|27.0| 4.0| no|
|female|32.0| 15.0| yes|
+------+----+------------+--------+
only showing top 3 rows val colArray=Array("gender", "age", "yearsmarried", "children")
colArray: Array[String] = Array(gender, age, yearsmarried, children) data.selectExpr(colArray:_*).show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
| male|37.0| 10.0| no|
|female|27.0| 4.0| no|
|female|32.0| 15.0| yes|
+------+----+------------+--------+
only showing top 3 rows // 操作指定的列,并排序
// data.selectExpr("gender", "age+1","cast(age as bigint)").orderBy($"gender".desc, $"age".asc).show
data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc).show
+------+----+----+
|gender|age1|age2|
+------+----+----+
| male|23.0| 22|
| male|23.0| 22|
| male|38.0| 37|
| male|58.0| 57|
| male|58.0| 57|
|female|23.0| 22|
|female|28.0| 27|
|female|33.0| 32|
|female|33.0| 32|
|female|33.0| 32|
+------+----+----+

2.4 查看SparkSQL逻辑和物理执行计划

val data4=data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc)
data4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gender: string, age1: double ... 1 more field] // 查看物理执行计划
data4.explain()
== Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
+- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
+- LocalTableScan [gender#20, age1#135, age2#136L, age#21] // 查看逻辑和物理执行计划
data4.explain(extended=true)
== Parsed Logical Plan ==
'Sort ['gender DESC, 'age ASC], true
+- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L]
+- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupation#2
6, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17] == Analyzed Logical Plan ==
gender: string, age1: double, age2: bigint
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
+- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L, age#21]
+- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupatio
n#26, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17] == Optimized Logical Plan ==
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
+- LocalRelation [gender#20, age1#135, age2#136L, age#21] == Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
+- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
+- LocalTableScan [gender#20, age1#135, age2#136L, age#21]

三、Dataset去重、差集、交集

import org.apache.spark.sql.functions._

3.1 对整个DataFrame的数据去重

// 对整个DataFrame的数据去重
data.distinct()
data.dropDuplicates()

3.2 对指定列的去重

// 对指定列的去重
val colArray=Array("affairs", "gender")
data.dropDuplicates(colArray)
//data.dropDuplicates("affairs", "gender")

3.3 差集

 val df=data.filter("gender=='male' ")
// data与df的差集
data.except(df).show
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0|female|32.0| 15.0| yes| 1.0| 12.0| 1.0| 4.0|
| 0.0|female|32.0| 1.5| no| 2.0| 17.0| 5.0| 5.0|
| 0.0|female|32.0| 15.0| yes| 4.0| 16.0| 1.0| 2.0|
| 0.0|female|22.0| 0.75| no| 2.0| 12.0| 1.0| 3.0|
| 0.0|female|27.0| 4.0| no| 4.0| 14.0| 6.0| 4.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+

3.4 交集


// data与df的交集
data.intersect(df)

四、Dataset聚合操作

data.groupBy("gender").agg(count($"age"),max($"age").as("maxAge"), avg($"age").as("avgAge")).show
+------+----------+------+------+
|gender|count(age)|maxAge|avgAge|
+------+----------+------+------+
|female| 5| 32.0| 29.0|
| male| 5| 57.0| 39.0|
+------+----------+------+------+ data.groupBy("gender").agg("age"->"count","age" -> "max", "age" -> "avg").show
+------+----------+--------+--------+
|gender|count(age)|max(age)|avg(age)|
+------+----------+--------+--------+
|female| 5| 32.0| 29.0|
| male| 5| 57.0| 39.0|
+------+----------+--------+--------+

4.1 DataFrame分组统计信息,groupBy,agg算子

亲测
groupBy聚合算子
people.unionAll(newPeople).groupBy(col("name")).count.show
或者
people.unionAll(newPeople).groupBy($"name").count.show
或者
people.unionAll(newPeople).groupBy("name").count.show 调用DataFrame的toDF方法,重新命名全部列名,增加列名的可读性
people.groupBy("depId").agg(Map("age" -> "max", "gender" -> "count"))
.toDF("depId","maxAge","countGender").show 使用函数式编程方式对前后两个合并进行分组统计并显示结果
people.unionAll(newPeople).groupBy($"name").count.filter($"count" < 2).show 使用groupBy方法将合并后的DataFrame按照"name" 列进行分组,
得到GroupData类的实例,实例会自动带上分组的列,以及"count" 列。 GroupData类在spark2.0.x 版本改为RelationalGroupedDataset

五、Dataset之视图与SQL

// 创建视图
data.createOrReplaceTempView("Affairs") val df1 = spark.sql("SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25")
df1: org.apache.spark.sql.DataFrame = [affairs: double, gender: string ... 7 more fields] // 子查询
val df2 = spark.sql("select gender, age,rating from ( SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25 ) t ")
df2: org.apache.spark.sql.DataFrame = [gender: string, age: double ... 1 more field] df2.show
+------+----+------+
|gender| age|rating|
+------+----+------+
| male|22.0| 3.0|
|female|22.0| 3.0|
| male|22.0| 5.0|
+------+----+------+

六、Dataset之collect_set与collect_list

collect_set去除重复元素;collect_list不去除重复元素

select gender,

concat_ws(’,’, collect_set(children)),

concat_ws(’,’, collect_list(children))

from Affairs

group by gender

// 创建视图
data.createOrReplaceTempView("Affairs") val df3= spark.sql("select gender,concat_ws(',',collect_set(children)),concat_ws(',',collect_list(children)) from Affairs group by gender")
df3: org.apache.spark.sql.DataFrame = [gender: string, concat_ws(,, collect_set(children)): string ... 1 more field] df3.show // collect_set去除重复元素;collect_list不去除重复元素
+------+-----------------------------------+------------------------------------+
|gender|concat_ws(,, collect_set(children))|concat_ws(,, collect_list(children))|
+------+-----------------------------------+------------------------------------+
|female| no,yes| no,yes,no,no,yes|
| male| no,yes| no,yes,no,yes,no|
+------+-----------------------------------+------------------------------------+

七、Dataset多维度统计cube与rollup

val df6 = spark.sql("select gender,children,max(age),avg(age),count(age) from Affairs group by Cube(gender,children) order by 1,2")
df6.show
+------+--------+--------+--------+----------+
|gender|children|max(age)|avg(age)|count(age)|
+------+--------+--------+--------+----------+
| null| null| 57.0| 34.0| 10|
| null| no| 37.0| 27.0| 6|
| null| yes| 57.0| 44.5| 4|
|female| null| 32.0| 29.0| 5|
|female| no| 32.0| 27.0| 3|
|female| yes| 32.0| 32.0| 2|
| male| null| 57.0| 39.0| 5|
| male| no| 37.0| 27.0| 3|
| male| yes| 57.0| 57.0| 2|
+------+--------+--------+--------+----------+ val df7 = spark.sql("select gender,children,max(age),avg(age),count(age) from Affairs group by rollup(gender,children) order by 1,2") df7.show
+------+--------+--------+--------+----------+
|gender|children|max(age)|avg(age)|count(age)|
+------+--------+--------+--------+----------+
| null| null| 57.0| 34.0| 10|
|female| null| 32.0| 29.0| 5|
|female| no| 32.0| 27.0| 3|
|female| yes| 32.0| 32.0| 2|
| male| null| 57.0| 39.0| 5|
| male| no| 37.0| 27.0| 3|
| male| yes| 57.0| 57.0| 2|
+------+--------+--------+--------+----------+

八、Dataset分析函数–排名函数row_number,rank,dense_rank,percent_rank

select gender,

age,

row_number() over(partition by gender order by age) as rowNumber,

rank() over(partition by gender order by age) as ranks,

dense_rank() over(partition by gender order by age) as denseRank,

percent_rank() over(partition by gender order by age) as percentRank

from Affairs

val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._ val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
(0, "male", 37, 10, "no", 3, 18, 7, 4),
(0, "female", 27, 4, "no", 4, 14, 6, 4),
(0, "female", 32, 15, "yes", 1, 12, 1, 4),
(0, "male", 57, 15, "yes", 5, 18, 6, 5),
(0, "male", 22, 0.75, "no", 2, 17, 6, 3),
(0, "female", 32, 1.5, "no", 2, 17, 5, 5),
(0, "female", 22, 0.75, "no", 2, 12, 1, 3),
(0, "male", 57, 15, "yes", 2, 14, 4, 4),
(0, "female", 32, 15, "yes", 4, 16, 1, 2),
(0, "male", 22, 1.5, "no", 4, 14, 4, 5),
(0, "male", 37, 15, "yes", 2, 20, 7, 2),
(0, "male", 27, 4, "yes", 4, 18, 6, 4),
(0, "male", 47, 15, "yes", 5, 17, 6, 4),
(0, "female", 22, 1.5, "no", 2, 17, 5, 4),
(0, "female", 27, 4, "no", 4, 14, 5, 4),
(0, "female", 37, 15, "yes", 1, 17, 5, 5),
(0, "female", 37, 15, "yes", 2, 18, 4, 3),
(0, "female", 22, 0.75, "no", 3, 16, 5, 4),
(0, "female", 22, 1.5, "no", 2, 16, 5, 5),
(0, "female", 27, 10, "yes", 2, 14, 1, 5),
(0, "female", 22, 1.5, "no", 2, 16, 5, 5),
(0, "female", 22, 1.5, "no", 2, 16, 5, 5),
(0, "female", 27, 10, "yes", 4, 16, 5, 4),
(0, "female", 32, 10, "yes", 3, 14, 1, 5),
(0, "male", 37, 4, "yes", 2, 20, 6, 4)) val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating") data.printSchema() // 创建视图
data.createOrReplaceTempView("Affairs") val s1="row_number() over(partition by gender order by age) as rowNumber,"
val s2="rank() over(partition by gender order by age) as ranks,"
val s3="dense_rank() over(partition by gender order by age) as denseRank,"
val s4="percent_rank() over(partition by gender order by age) as percentRank"
val df8=spark.sql("select gender,age,"+s1+s2+s3+s4+" from Affairs") df8.show(50)
+------+----+---------+-----+---------+------------------+
|gender| age|rowNumber|ranks|denseRank| percentRank|
+------+----+---------+-----+---------+------------------+
|female|22.0| 1| 1| 1| 0.0|
|female|22.0| 2| 1| 1| 0.0|
|female|22.0| 3| 1| 1| 0.0|
|female|22.0| 4| 1| 1| 0.0|
|female|22.0| 5| 1| 1| 0.0|
|female|22.0| 6| 1| 1| 0.0|
|female|27.0| 7| 7| 2| 0.4|
|female|27.0| 8| 7| 2| 0.4|
|female|27.0| 9| 7| 2| 0.4|
|female|27.0| 10| 7| 2| 0.4|
|female|32.0| 11| 11| 3|0.6666666666666666|
|female|32.0| 12| 11| 3|0.6666666666666666|
|female|32.0| 13| 11| 3|0.6666666666666666|
|female|32.0| 14| 11| 3|0.6666666666666666|
|female|37.0| 15| 15| 4|0.9333333333333333|
|female|37.0| 16| 15| 4|0.9333333333333333|
| male|22.0| 1| 1| 1| 0.0|
| male|22.0| 2| 1| 1| 0.0|
| male|27.0| 3| 3| 2| 0.25|
| male|37.0| 4| 4| 3| 0.375|
| male|37.0| 5| 4| 3| 0.375|
| male|37.0| 6| 4| 3| 0.375|
| male|47.0| 7| 7| 4| 0.75|
| male|57.0| 8| 8| 5| 0.875|
| male|57.0| 9| 8| 5| 0.875|
+------+----+---------+-----+---------+------------------+

九、DataSet 创建新行之flatMap

val dfList = List(("Hadoop", "Java,SQL,Hive,HBase,MySQL"), ("Spark", "Scala,SQL,DataSet,MLlib,GraphX"))
dfList: List[(String, String)] = List((Hadoop,Java,SQL,Hive,HBase,MySQL), (Spark,Scala,SQL,DataSet,MLlib,GraphX)) case class Book(title: String, words: String) val df=dfList.map{p=>Book(p._1,p._2)}.toDS()
df: org.apache.spark.sql.Dataset[Book] = [title: string, words: string] df.show
+------+--------------------+
| title| words|
+------+--------------------+
|Hadoop|Java,SQL,Hive,HBa...|
| Spark|Scala,SQL,DataSet...|
+------+--------------------+ df.flatMap(_.words.split(",")).show
+-------+
| value|
+-------+
| Java|
| SQL|
| Hive|
| HBase|
| MySQL|
| Scala|
| SQL|
|DataSet|
| MLlib|
| GraphX|
+-------+

最新文章

  1. 安装完成后在命令行运行bash时报错0x80070057
  2. 【linux】学习6
  3. hdu1115(计算多边形几何重心)
  4. Global::pickClassMethod_DNT
  5. 北大ACM(POJ1002-487-3279)
  6. 使用C语言实现二维,三维绘图算法(2)-解析曲面的显示
  7. 超大型 LED 显示屏
  8. linux内核奇遇记之md源代码解读之四
  9. Runtime 在IOS中的详细使用
  10. Mac苹果电脑加密视频播放器使用教程
  11. #include &lt;boost/scoped_ptr.hpp&gt;
  12. CSS或者JS实现鼠标悬停显示另一元素
  13. String to Double出现误差
  14. sql定期移植数据的存储过程
  15. [傻瓜版] Redis在Windows下的开发环境配置步骤
  16. Splay的初步学习
  17. 校内模拟赛 虫洞(by NiroBC)
  18. js的匿名函数 和普通函数
  19. Bootstrap(8) 路径分页标签和徽章组件
  20. Android_如何隐藏应用程序的图标

热门文章

  1. python之scrapy篇(一)
  2. 关于Byte(1) 与int (1) 比较原理
  3. 一些php文件函数
  4. .NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ 工作队列和交换机)--学习笔记
  5. 如何使用蓝湖设计稿同时适配PC及移动端
  6. 经典项目管理 OR 敏捷项目管理,我该怎么选?
  7. Debian9 升级至 Debian10
  8. 爬虫-urllib模块的使用
  9. NAS基础知识
  10. ASP.NET Core错误处理中间件[2]: 开发者异常页面