目录

1

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object Demo1Sess {
def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder()
.master("local")
.appName("Demo1Sess")
// 设置spark sql产生shuffle后默认的分区数 => 并行度
// 默认是 200
.config("spark.sql.shuffle.partitions",3)
.getOrCreate() // 从SparkSession获取SparkContext
// val sc: SparkContext = spark.sparkContext // json中每条数据都自带结构 可以直接转换成DF
val stuDF: DataFrame = spark
.read
.format("json")
.load("spark/data/students.json") stuDF.show() //默认显示20条 // 文本类的数据 默认是没有列名的 直接读进来是 _c0 _c1 _c2 ......
// 可以通过schema手动指定列名,空格隔开字段和字段类型
val stucsDF: DataFrame = spark
.read
.format("csv")
.schema("id String,name String,age Int,gender String,clazz String")
.load("scala/data/students.txt") stucsDF.show() // 直接将DataFrame注册成临时视图view
stucsDF.createOrReplaceTempView("stu") // sql的方式
val ageDF: DataFrame = spark.sql("select * from stu where age=22")
ageDF.show() // 同rdd一样,操作算子可以触发job // DSL 类SQL的方式 介于SQL和代码中间的API val dslDF: DataFrame = stucsDF.where("age=23")
.select("name", "age", "clazz")
dslDF.show() // 统计班级人数
stucsDF.groupBy("clazz")
.count()
.write
.mode(SaveMode.Overwrite)
.save("spark/data/clazz_cnt") // 保存的时候可以指定SaveMode
// Overwrite 覆盖
// Append 追加
// 默认以parquet形式保存 } }

2

import Practice.Student
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} object Demo2CreateDF {
def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder()
.master("local")
.appName("Demo2CreateDF")
.config("spark.sql.shuffle.partitions", 3)
.getOrCreate() /**
* 1、读json数据
*/ val jsonDF: DataFrame = spark.read
.format("json")
.load("spark/data/students.json") // jsonDF.show() // 默认显示20条
// jsonDF.show(100) // 显示100条
// jsonDF.show(false) // 完全显示
// jsonDF.show() /**
* 2、读文本文件
*/ val csvDF: DataFrame = spark.read
.format("csv")
//csv 格式读取默认是以逗号分隔
.option("sep", ",")
.schema("id String,name String,age Int,gender String,clazz String")
.load("scala/data/students.txt") // csvDF.show() /**
* 3、JDBC 读取MySQL的一张表转换成 Spark SQL中的DF
*/ val jdbcDF: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306/student")
.option("dbtable", "student")
.option("user", "root")
.option("password", "123456")
.load() // jdbcDF.show() // 将数据以parquet格式保存
// jdbcDF
// .write
// .mode(SaveMode.Overwrite)
// .parquet("spark/data/stu_parquet") /**
* 4、读取parquet文件
* 无法直接查看,默认会进行压缩,而且自带表结构,读取时不需要指定schema
* 默认使用snappy压缩方式进行压缩
*/ spark.read
.format("parquet")
.load("spark/data/stu_parquet")
// .show() // 将数据以orc格式保存
// jdbcDF.write.orc("spark/data/stu_orc") /**
* 5、读取ORC格式的文件
* 也会默认进行压缩,空间占用率最小,默认带有表结构,可以直接读取
*/ // spark
// .read
// .format("orc")
// .load("spark/data/stu_orc") /**
* 6、从RDD构建DF
*/ val stuRDD: RDD[String] = spark.sparkContext.textFile("scala/data/students.txt")
val stuRDD2: RDD[Student] = stuRDD.map(line => {
val splits: Array[String] = line.split(",")
val id: String = splits(0)
val name: String = splits(1)
val age: String = splits(2)
val gender: String = splits(3)
val clazz: String = splits(4)
Student(id, name, age, gender, clazz)
}) // 导入隐式转换
import spark.implicits._
val sDF: DataFrame = stuRDD2.toDF()
sDF.show() // DataFrame to RDD
val rdd: RDD[Row] = sDF.rdd
rdd.foreach(row=>{
val id: String = row.getAs[String]("id")
val name: String = row.getAs[String]("name")
println(s"$id,$name")
}) } case class Student(id:String,name:String,age: String, gender: String, clazz: String) }

3

import org.apache.spark.sql.{DataFrame, SparkSession}

object DFapi {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local")
.appName("DFapi")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate() import spark.implicits._ val stuDF: DataFrame = spark.read
.format("csv")
.option("sep", ",")
.schema("id String,name String,age String,gender String,clazz String")
.load("scala/data/students.txt") // 对多次使用的DF也可进行cache
stuDF.cache() // 过滤 where
// 过滤出 年龄 大于 23的学生
// DSL // 字符串表达式
stuDF.where("age>23") // 列表达式 (推荐),需要先导入隐式转换
stuDF.where($"age" > 23) // 使用filter加函数的方式进行过滤
stuDF.filter(row => {
val age: String = row.getAs[String]("age")
if (age.toInt > 23) {
true
}
else {
false
}
}) // select
stuDF.select($"id", $"name", $"age" + 100 as "newage") // 分组 groupBy
// 聚合
// 统计班级人数
stuDF.groupBy($"clazz")
.count().show() // 导入所有的sql函数
import org.apache.spark.sql.functions._
// 统计每个班的性别人数
stuDF.groupBy($"clazz", $"gender")
.agg(count($"gender"))
.show() // 统计班级人数(数据可能有重复)
stuDF.groupBy($"clazz")
.agg(countDistinct($"id") as "去重人数")
.show() // SQL 的方式
stuDF.createOrReplaceTempView("stu")
spark.sql(
"""
|select clazz,count(distinct id)
|from stu
|group by clazz
""".stripMargin
).show() // join
val scoreDF: DataFrame = spark.read
.format("csv")
.schema("sid String,sub_id String,score Int")
.load("scala/data/score.txt") // 当两张表的关联字段名字一样时
// 在这里直接指定 "id" 默认是inner join
// .join(scoreDF, "id")
// 可以将 "id" 放入 List 传入 再指定关联类型
// .join(scoreDF, List("id"), "left")
// 如果 关联字段不一样
stuDF.join(scoreDF,$"id"===$"sid","left").show() stuDF.unpersist() } }

4

import org.apache.spark.sql.{DataFrame, SparkSession}

object DianXin {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local")
.appName("DianXin")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate() val dxDF: DataFrame = spark.read
.format("csv")
.option("sep", ",")
.schema("mdn String,grid_id String,city_id String,county_id String,t String,start_time String,end_time String,date String")
.load("spark/data/dianxin_data") // 导入隐式转换
import spark.implicits._
// 导入Spark SQL中所有的函数
import org.apache.spark.sql.functions._ // 按城市统计每个区县的游客人数top3
dxDF.createOrReplaceTempView("dx") spark.sql(
"""
|select tt1.city_id,tt1.county_id,tt1.sum,tt1.rk
|from
|(select t1. city_id,t1.county_id,t1.sum,row_number() over (partition by county_id order by t1.sum desc) as rk
|from
|(select city_id,county_id,count(distinct mdn) as sum
|from
|dx
|group by city_id,county_id) t1) tt1
|where tt1.rk<3
|
""".stripMargin
).show() }
}

最新文章

  1. Reverse Core 第三部分 - 21章 - Windows消息钩取
  2. UIViewController生命周期
  3. 通过 JDBC 驱动程序使用大容量复制
  4. 使用HttpWebRequest和HtmlAgilityPack抓取网页(拒绝乱码,拒绝正则表达式)
  5. HDU1853 Cyclic Tour(最小费用最大流)
  6. 李洪强漫谈iOS开发[C语言-052]-for循环
  7. 【Avalon源码】iterator
  8. 【转】自动实时监控Windows2003服务器终端登录并发邮件和发短信通知
  9. 【转载】Powershell获取世纪互联Office365所有用户最后一次登录时间
  10. Android获取本机IP地址
  11. pureMVC简单示例及其原理讲解五(Facade)
  12. 搭建免费wifi,嗅探接入该wifi的所有网络信息
  13. nodejs-基础
  14. easygui控件介绍
  15. 比特币VS美元兑换查询网址
  16. NHibernate中Session的处理 线程不安全
  17. error: &#39;release&#39; is unavailable: not available in automatic reference counting,该怎么解决
  18. 设计模式之享元(flyweight)模式
  19. 《JavaScript高级程序设计》第5章 Object类型---Array---学习心得
  20. openldap完整版本搭建记录

热门文章

  1. 计算机网络-5-9-TCP拥塞控制
  2. 分布式缓存——Redis
  3. SpringBoot的.gitignore文件使用
  4. jdk、jre、javase、javaee、javame的区别
  5. bom-client
  6. java中静态代码块详解
  7. 深入了解Element Form表单动态验证问题 转载
  8. mysql启动错误:mysql.sock丢失
  9. Collections与Arrays
  10. python基础——生成器与迭代器