本文持续更新中。。。

Spark Session中的DataFrame类似于一张关系型数据表。在关系型数据库中对单表或进行的查询操作,在DataFrame中都可以通过调用其API接口来实现。

可以参考,Scala提供的DataFrame API。本文将使用SparkSession进行操作。

一、DataFrame对象的生成

val ss = SparkSession.builder()
          .appName("ta")
          .master("local[4]")
          .config("spark.mongodb.input.uri","mongodb://username:password@192.168.1.3:27017/log.")
          .config("spark.mongodb.output.uri","mongodb://username:password@192.168.1.3:27017/log")

          .config("es.index.auto.create", "true")
          .config("es.nodes","192.168.1.1")
          .config("es.port","9200")
          .getOrCreate()

1.读写mysql数据   

  val url = "jdbc:mysql://m000:3306/test"
  val jdbcDF = ss.read.format( "jdbc" ).options(Map( "url" -> url,"user" -> "xxx","password" -> "xxx", "dbtable" -> "xxx" )).load()

  data2DF.write.mode("overwrite").format("jdbc").options(Map("url" ->url, "dbtable" -> "TableName")).save()

2.读写SqlServer数据   

  val sqlUrl="jdbc:sqlserver://192.168.1.3:1433;DatabaseName=mytable;username=xxxx;password=xxx"
  val data2DF = ss.read.format("jdbc").options( Map("url" -> sqlsUrl, "dbtable" -> "TableName")).load()

  data2DF.write.mode("overwrite").format("jdbc").options(Map("url" ->sqlUrl, "dbtable" -> "TableName")).save()

3.读写MongoDB数据

  import com.mongodb.spark._
  import com.mongodb.spark.config.ReadConfig
  读取
    val data1DF = MongoSpark.load(ss, ReadConfig(Map("collection" -> "TableName"), Some(ReadConfig(ss))))
    val data2=ss.sparkContext.loadFromMongoDB(ReadConfig(Map("uri" -> readUrl))).toDF()
    第一种方式适用于读取同一个库中的数据,当在不同库中读取数据时,可以使用第二种
    MongoSpark.save(datas.write.option("collection", "documentName").mode("append"))

4.读写ES数据

  import org.elasticsearch.spark.sql._

  ss.esDF("/spark_applog/applog")

  df.saveToEs("/spark_applog/applog")

二、DataFrame对象上Action操作

1、show:展示数据

  以表格的形式在输出中展示jdbcDF中的数据,类似于select * from spark_sql_test的功能。 
  show方法有四种调用方式,分别为, 
(1)show 
  只显示前20条记录。且过长的字符串会被截取
  示例:jdbcDF.show

(2)show(numRows: Int)

  显示numRows条 
  示例:jdbcDF.show(3)

(3)show(truncate: Boolean) 
  是否截取20个字符,默认为true。 
  示例:jdbcDF.show(false)  

(4)show(numRows: Int, truncate: Int) 
  显示记录条数,以及截取字符个数,为0时表示不截取
  示例:jdbcDF.show(3, 0)

2、collect:获取所有数据到数组

  不同于前面的show方法,这里的collect方法会将jdbcDF中的所有数据都获取到,并返回一个Array对象。

jdbcDF.collect()

  结果数组包含了jdbcDF的每一条记录,每一条记录由一个GenericRowWithSchema对象来表示,可以存储字段名及字段值。

3、collectAsList:获取所有数据到List

  功能和collect类似,只不过将返回结构变成了List对象,使用方法如下

jdbcDF.collectAsList()

4、describe(cols: String*):获取指定字段的统计信息

  这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。 
  使用方法如下,其中c1字段为字符类型,c2字段为整型,c4字段为浮点型

jdbcDF .describe("c1" , "c2", "c4" ).show()

  结果如下, 
  

5、first, head, take, takeAsList:获取若干行记录

  这里列出的四个方法比较类似,其中 
  (1)first获取第一行记录 
  (2)head获取第一行记录,head(n: Int)获取前n行记录 
  (3)take(n: Int)获取前n行数据 
  (4)takeAsList(n: Int)获取前n行数据,并以List的形式展现 
  以Row或者Array[Row]的形式返回一行或多行数据。firsthead功能相同。 
  taketakeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError

  使用和结果略。

三、DataFrame对象上的条件查询和join等操作

  以下返回为DataFrame类型的方法,可以连续调用。

1、where条件相关

(1)where(conditionExpr: String):SQL语言中where关键字后的条件 
  传入筛选条件表达式,可以用andor。得到DataFrame类型的返回结果, 
  示例:

jdbcDF .where("id = 1 or c1 = 'b'" ).show()

  结果, 
  

(2)filter:根据字段进行筛选 
  传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同 
  示例:jdbcDF .filter("id = 1 or c1 = 'b'" ).show()

  

2、查询指定字段

(1)select:获取指定字段值 
  根据传入的String类型字段名,获取指定字段的值,以DataFrame类型返回 
  示例:

jdbcDF.select( "id" , "c3" )

  还有一个重载的select方法,不是传入String类型参数,而是传入Column类型参数。可以实现select id, id+1 from test这种逻辑。

jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)

  结果: 
  

  能得到Column类型的方法是apply以及col方法,一般用apply方法更简便。

(2)selectExpr:可以对指定字段进行特殊处理 
  可以直接对指定字段调用UDF函数,或者指定别名等。传入String类型参数,得到DataFrame对象。 
  示例,查询id字段,c3字段取别名timec4字段四舍五入:

jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show(false)

  结果, 
  

(3)col:获取指定字段 
  只能获取一个字段,返回对象为Column类型。 
  val idCol = jdbcDF.col(“id”)

(4)apply:获取指定字段 
  只能获取一个字段,返回对象为Column类型 
  示例:

val idCol1 = jdbcDF.apply("id")
val idCol2 = jdbcDF("id")

(5)drop:去除指定字段,保留其他字段 
  返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。 
  示例:

jdbcDF.drop("id")
jdbcDF.drop(jdbcDF("id"))

3、limit

  limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和takehead不同的是,limit方法不是Action操作。

jdbcDF.limit(3)

4、order by

(1)orderBysort:按指定字段排序,默认为升序 
  示例1,按指定字段排序。加个-表示降序排序。sortorderBy使用方法相同

jdbcDF.orderBy(- jdbcDF("c4")).show(false) 只能对数字类型和日期类型生效
// 或者
jdbcDF.orderBy(jdbcDF("c4").desc).show(false)

  结果, 
   

(2)sortWithinPartitions 
  和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。

5、group by

(1)groupBy:根据字段进行group by操作 
  groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。 
  使用方法如下,

jdbcDF .groupBy("c1" )
jdbcDF.groupBy( jdbcDF( "c1"))

(2)cuberollup:group by的扩展

  功能类似于SQL中的group by cube/rollup

  原表:

    

  cube:       mst.cube("name","class").sum("score").show()

    

  rollup:  mst.rollup("name","class").sum("score").show()

    

(3)GroupedData对象 
  该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,

  • max(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
  • min(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
  • mean(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
  • sum(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
  • count()方法,获取分组中的元素个数

      运行结果示例: 
      count 
      

      max 
      

      这里面比较复杂的是以下两个方法, 
    agg,该方法和下面介绍的类似,可以用于对指定字段进行聚合操作。

   pivot

6、distinct

(1)distinct:返回一个不包含重复记录的DataFrame 
  返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。 
  示例:

jdbcDF.distinct()

  结果, 
  

(2)dropDuplicates:根据指定字段去重 
  根据指定字段去重。类似于select distinct a, b操作 
  示例:

jdbcDF.dropDuplicates(Seq("c1"))

  结果: 
  

7、聚合

  聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。 
  以下示例其中最简单直观的一种用法,对id字段求最大值,对c4字段求和。

jdbcDF.agg("id" -> "max", "c4" -> "sum")

  结果: 
  

8、union

  union方法:对两个DataFrame进行合并
  示例:

jdbcDF.union(jdbcDF.limit(1))

  结果: 
  

9、join

  重点来了。在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。 
  接下来隆重介绍join方法。在DataFrame中提供了六个重载的join方法。 
(1)、笛卡尔积

joinDF1.join(joinDF2)

(2)、using一个字段形式 
  下面这种join类似于a join b using column1的形式,需要两个DataFrame中有相同的一个列名,

joinDF1.join(joinDF2, "id")

(3)、using多个字段形式 
  除了上面这种using一个字段的情况外,还可以using多个字段,如下

joinDF1.join(joinDF2, Seq("id", "name"))

(4)、指定join类型 
  在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型,如下所示

  inner:内连

  outer,full,full_outer:全连

  left, left_outer:左连

  right,right_outer:右连

  left_semi:过滤出joinDF1中和joinDF2共有的部分

  left_anti:过滤出joinDF1中joinDF2没有的部分

joinDF1.join(joinDF2, Seq("id", "name"), "inner")

(5)、使用Column类型来join 
  如果不用using模式,灵活指定join字段的话,可以使用如下形式

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))

  结果如下, 
  

(6)、在指定join字段同时指定join类型 
  如下所示

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")

10、获取指定字段统计信息

  stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。 
  下面代码演示根据c4字段,统计该字段值出现频率在30%以上的内容。在jdbcDF中字段c1的内容为"a, b, a, c, d, b"。其中ab出现的频率为2 / 6,大于0.3

jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()

  结果如下: 
  

11、获取两个DataFrame中共有的记录

  intersect方法可以计算出两个DataFrame中相同的记录,

jdbcDF.intersect(jdbcDF.limit(1)).show(false)

  结果如下: 
  

12、获取一个DataFrame中有另一个DataFrame中没有的记录

  示例:

jdbcDF.except(jdbcDF.limit(1)).show(false)

  结果如下, 
  

13、操作字段名

(1)withColumnRenamed:重命名DataFrame中的指定字段名 
  如果指定的字段名不存在,不进行任何操作。下面示例中将jdbcDF中的id字段重命名为idx

jdbcDF.withColumnRenamed( "id" , "idx" )

  结果如下: 
  

(2)withColumn:往当前DataFrame中新增一列 
  whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。 
  以下代码往jdbcDF中新增一个名为id2的列,

jdbcDF.withColumn("id2", jdbcDF("id")).show( false)

  结果如下, 
  

14、行转列

  有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法 
  下面代码中,根据c3字段中的空格将字段内容进行分割,分割的内容存储在新的字段c3_中,如下所示

jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}

  结果如下, 
  

最新文章

  1. Java面试题总结 from Baidu 网易 阿里
  2. ftp应用
  3. ajax onblur 用法
  4. this 指向
  5. js中Number数字数值运算后值不对
  6. leetcode@ [279]Perfect Squares
  7. MVC 控制器详解
  8. 显示出eclipse文件层次
  9. 什么是Bash Shell的内建(build in)命令
  10. 读书笔记 effective c++ Item 15 在资源管理类中提供对原生(raw)资源的访问
  11. Java学习2——HelloWorld(编写第一个java程序)
  12. 死磕 java集合之TreeSet源码分析
  13. PHP错误解决:Fatal error: Unknown: Failed opening required ...
  14. Javascript 对象的创建和属性的判定
  15. DNA甲基化测序方法介绍
  16. NOIP练习赛题目6
  17. Android-Cannot merge new index 66195 into a non-jumbo instruction的解决办法
  18. openstack常用的一些命令
  19. 关于Cocos2d-x中数据的存储提取和类型转换
  20. Sys未定义处理方法

热门文章

  1. 关于限制DHCP服务器广播的另类方法
  2. JS中replace()用法举例
  3. SpringMVC + Spring + Mybatis+ Redis +shiro以及MyBatis学习
  4. FastDFS 简介
  5. 【UML 建模】状态图介绍
  6. LeetCode 476. Number Complement (数的补数)
  7. HDU 6092 Rikka with Subset
  8. CentOS6软raid配置与管理
  9. JAVAscript学习笔记 jsBOM 第七节 (原创) 参考js使用表
  10. HTML学习笔记 域元素(form表单、textarea文本域、fieldset域集合、input使用) 案例 第四节 (原创)