SparkSQLDemo.scala


import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType} object SparkSQLDemo { // $example on:create_ds$
case class Person(name: String, age: Long)
// $example on:create_ds$ def main(args: Array[String]): Unit = {
//开启SparkSession
// $example on: init_session$
val spark = SparkSession
.builder()
.appName("SparkSQLDemo")
.master("local")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// $example off: init_session$ // runBasicDataFrameDemo(spark)
// runDatasetCreationDemo(spark)
// runInferSchemaDemo(spark)
runProgrammaticSchemaDemo(spark) //关闭SparkSeesion
spark.stop() } private def runBasicDataFrameDemo(spark: SparkSession) = { val df = spark.read.json("/Users/hadoop/app/spark/examples/src/main/resources/people.json") //Displays the content of the DataFrame to stdout
df.show() //Print the schema in a tree format
df.printSchema() //Select only the "name" column
df.select("name").show() //This import is needed to use the $-notation
import spark.implicits._
df.select($"name", $"age" + 1).show() //Select people older than 21
df.select($"age" > 21).show() //Count people by age
df.groupBy("age").count().show() //$example on: global_temp_view$
//Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("select * from people")
sqlDF.show() //Register the DataFrame as a global temporary view
df.createGlobalTempView("people") //Global temporary view is tied to a system preserved database `global_temp`
spark.sql("select * from global_temp.people").show //Global temporary view is cross-session
spark.newSession().sql("select * from global_temp.people").show() } private def runDatasetCreationDemo(spark: SparkSession) = { // A container for a [[Dataset]], used for implicit conversions in Scala.
// To use this, import implicit conversions in SQL:
import spark.implicits._ // .toDS() -> 这是用括号声明的,以防止Scala编译器将`rdd.toDS(“1”)`视为调用此toDS然后应用于返回的数据集。 //Encoder are created for case classes (为case class 创建编码器)
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show() //Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).foreach(println(_))//.collect() //DataFrames can be converted to a Dataset by providing a class. Mapping will bedone by name
val path = "/Users/hadoop/app/spark/examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show() } private def runInferSchemaDemo(spark: SparkSession) = { // $example on: schema_inferring$
//For implicit conversions from RDDs to DataFrames
import spark.implicits._ //Create an RDD of Person objects from a text file, convert it to a DataFrame
val peopleDF = spark.sparkContext
.textFile("/Users/hadoop/app/spark/examples/src/main/resources/people.txt")
.map(_.split(","))
.map(x => Person(x(0), x(1).trim.toInt))
.toDF() //Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people") //SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("select name, age from people where age between 13 and 19") //The columns of a row in the result can be accessed by field index
//(结果中的行的列可以通过字段索引访问)
teenagersDF.map(teenager => s"Name: ${teenager(0)}").show() //or by field name
teenagersDF.map(teenager => s"Name: ${teenager.getAs[String]("name")}").show() //No pre-defined encoders for Dataset[Map[K,V]], define explicitly
//(Dataset[Map[K,V]] 没有预定义的编码器, 显式定义)
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] //Primitive types and case classes can be also defined as
//(原始类型和case类也可以定义为隐式val )
//implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() //row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager =>
teenager.getValuesMap[Any](List("name", "age"))
).foreach(println(_))//.collect() // $example off: schema_inferring$ } private def runProgrammaticSchemaDemo(spark: SparkSession) = { import spark.implicits._
// $example on: programmatic_schema$ //Create an RDD
val peopleRDD = spark.sparkContext.textFile("/Users/hadoop/app/spark/examples/src/main/resources/people.txt") //The schema is encoded in a string
val schemaString = "name age" //Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields) //Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim)) //Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema) //Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people") //SQL can be run over a temporary view created using DataFrames
val results = spark.sql("select name from people") //The results of SQL queries are DataFrames and support all the normal RDD operations
//The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => s"Name: ${attributes(0)}").show() // $exmaple off: programmatic_schema$
}
}


最新文章

  1. 【JUC】JDK1.8源码分析之LockSupport(一)
  2. c语言基础表达式, 关系运算符, 逻辑运算符, 位运算符, 数据的取值范围, 分支结构(if...else, switch...case)
  3. jquery.validate 基础
  4. linux命令每日一练习-pwd,cd
  5. 线性判别分析(Linear Discriminant Analysis,LDA)
  6. 完善GDAL与OpenCV间的数据格式转换与影像分块读写
  7. Service、Alarm与BroadcastReceiver的使用方法
  8. protobuf NET使用
  9. HDU 4916 树形dp
  10. HTTP协议 HttpWebRequest和 Socket的一点总结
  11. Numpy 操作
  12. 初始化仓库(git init)
  13. [Python] Python 学习 - 可视化数据操作(一)
  14. 查询Oracle 临时表空间使用情况[z]
  15. OpenCV 学习笔记 05 人脸检测和识别 AttributeError: module 'cv2' has no attribute 'face'
  16. 『流畅的Python』第9章笔记_对象
  17. 文件通过svn updata更新不到,并且svn st显示被删除的解决办法
  18. [Windows Azure] How to use the Windows Azure Blob Storage Service in .NET
  19. POJ 1847 Floyd_wshall算法
  20. PAT 1003 我要通过!(20)(代码+思路)

热门文章

  1. GoogLeNet结构
  2. Tomcat相关目录及配置文件
  3. table表格属性
  4. zz《百度地图商业选址》
  5. SOA案例分析浅谈
  6. 使用Python拆分数据量大的CSV文件(亲测有效)
  7. lincense更新失败
  8. 【CometOJ】Comet OJ - Contest #8 解题报告
  9. 公式推导【BACF//ICCV2017】
  10. es6中reduce()方法和reduceRight()方法