上一篇说到,在Spark 2.x当中,实际上SQLContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createOrReplaceTempView注册一个临时表,所以关键是先要将RDD转换成DataFrame。实际上,在Spark中实际声明了

type DataFrame = Dataset[Row]

所以,DataFrame是Dataset[Row]的别名。RDD是提供面向低层次的API,而DataFrame/Dataset提供面向高层次的API(适合于SQL等面向结构化数据的场合)。

下面提供一些Spark SQL程序的例子。

例子一:SparkSQLExam.scala

 package bruce.bigdata.spark.example

 import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._ object SparkSQLExam { case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double) def main(args: Array[String]) { val spark = SparkSession
.builder
.appName("SparkSQLExam")
.getOrCreate() runSparkSQLExam1(spark)
runSparkSQLExam2(spark) spark.stop() } private def runSparkSQLExam1(spark: SparkSession): Unit = { import spark.implicits._ val rddOffices=spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
val officesDataFrame = spark.createDataFrame(rddOffices) officesDataFrame.createOrReplaceTempView("offices")
spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println) } private def runSparkSQLExam2(spark: SparkSession): Unit = { import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._ val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
val rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
val dataFrame = spark.createDataFrame(rowRDD, schema) dataFrame.createOrReplaceTempView("offices2")
spark.sql("select city from offices2 where region='Western'").map(t=>"City: " + t(0)).collect.foreach(println) } }

使用下面的命令进行编译:

[root@BruceCentOS4 scala]# scalac SparkSQLExam.scala

在编译之前,需要在CLASSPATH中增加路径:

export CLASSPATH=$CLASSPATH:$SPARK_HOME/jars/*:$(/opt/hadoop/bin/hadoop classpath)

然后打包成jar文件:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

然后通过spark-submit提交程序到yarn集群执行,为了方便从客户端查看结果,这里采用yarn cient模式运行。

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkSQLExam --master yarn --deploy-mode client spark_exam_scala.jar

运行结果截图:

例子二:SparkSQLExam.scala(需要启动hive metastore)

 package  bruce.bigdata.spark.example

 import org.apache.spark.sql.{SaveMode, SparkSession}

 object SparkHiveExam {

     def main(args: Array[String]) {

         val spark = SparkSession
.builder()
.appName("Spark Hive Exam")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate() import spark.implicits._ //使用hql查看hive数据
spark.sql("show databases").collect.foreach(println)
spark.sql("use orderdb")
spark.sql("show tables").collect.foreach(println)
spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println) //将hql查询出的数据保存到另外一张新建的hive表
//找出订单金额超过1万美元的产品
spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE""")
spark.sql("""select mfr_id,product_id,description
from products a inner join orders b
on a.mfr_id=b.mfr and a.product_id=b.product
where b.amount>10000""").write.mode(SaveMode.Overwrite).saveAsTable("products_high_sales") //将HDFS文件数据导入到hive表中
spark.sql("""CREATE TABLE IF NOT EXISTS offices2 (office int,city string,region string,mgr int,target double,sales double )
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE""")
spark.sql("LOAD DATA INPATH '/user/hive/warehouse/orderdb.db/offices/offices.txt' INTO TABLE offices2") spark.stop()
}
}

使用下面的命令进行编译:

[root@BruceCentOS4 scala]# scalac SparkHiveExam.scala

使用下面的命令打包:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

使用下面的命令运行:

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkHiveExam --master yarn --deploy-mode client spark_exam_scala.jar

程序运行结果:

 

另外上述程序运行后,hive中多了2张表:

例子三:spark_sql_exam.py

 from __future__ import print_function

 from pyspark.sql import SparkSession
from pyspark.sql.types import * if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL exam") \
.config("spark.some.config.option", "some-value") \
.getOrCreate() schema = StructType([StructField("office", IntegerType(), False), StructField("city", StringType(), False),
StructField("region", StringType(), False), StructField("mgr", IntegerType(), True),
StructField("Target", DoubleType(), True), StructField("sales", DoubleType(), False)]) rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split("\t")) \
.map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip()))) dataFrame = spark.createDataFrame(rowRDD, schema)
dataFrame.createOrReplaceTempView("offices")
spark.sql("select city from offices where region='Eastern'").show() spark.stop()

执行命令运行程序:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py

程序运行结果:

例子四:JavaSparkSQLExam.java

 package bruce.bigdata.spark.example;

 import java.util.ArrayList;
import java.util.List; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.AnalysisException; public class JavaSparkSQLExam {
public static void main(String[] args) throws AnalysisException {
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL exam")
.config("spark.some.config.option", "some-value")
.getOrCreate(); List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("office", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("city", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("region", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("mgr", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("target", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("sales", DataTypes.DoubleType, false)); StructType schema = DataTypes.createStructType(fields); JavaRDD<String> officesRDD = spark.sparkContext()
.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1)
.toJavaRDD(); JavaRDD<Row> rowRDD = officesRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split("\t");
return RowFactory.create(Integer.valueOf(attributes[0].trim()), attributes[1], attributes[2], Integer.valueOf(attributes[3].trim()), Double.valueOf(attributes[4].trim()), Double.valueOf(attributes[5].trim()));
}); Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, schema); dataFrame.createOrReplaceTempView("offices");
Dataset<Row> results = spark.sql("select city from offices where region='Eastern'");
results.collectAsList().forEach(r -> System.out.println(r)); spark.stop();
}
}

编译打包后通过如下命令执行:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.JavaSparkSQLExam --master yarn --deploy-mode client spark_exam_java.jar

运行结果:

上面是一些关于Spark SQL程序的一些例子,分别采用了Scala/Python/Java来编写的。另外除了这三种语言,Spark还支持R语言编写程序,因为我自己也不熟悉,就不举例了。不管用什么语言,其实API都是基本一致的,主要是采用DataFrame和Dataset的高层次API来调用和执行SQL。使用这些API,可以轻松的将结构化数据转化成SQL来操作,同时也能够方便的操作Hive中的数据。

最新文章

  1. Linux服务器模型及其对应的程序流程
  2. 一些用过的我常忘记的小知识(web前端)
  3. break continue 区别 以及实例
  4. 使用redis避免客户端频繁提交数据
  5. gnuplot conditional plotting: plot col A:col B if col C == x
  6. [转载]win32 计时器使用
  7. KMS错误代码收集
  8. jQuery.retryAjax
  9. DataTemplate和ControlTemplate联系与区别
  10. JSP环境配置
  11. iOS开发-No matching provisioning profiles found解决方法
  12. 注意在insert插入数据库时的int类型问题
  13. unmount的时候报错
  14. 初识Linux 命令
  15. VS2017编译SFML SDK配制环境详解
  16. python requests模拟登陆正方教务管理系统,并爬取成绩
  17. CodeForces 1151C Problem for Nazar
  18. Github恶搞之自定义你的contribution图表
  19. springboot 日期类型处理
  20. (1)剑指Offer之斐波那契数列问题和跳台阶问题

热门文章

  1. Spring 源码阅读之 深入理解 finishBeanFactoryInitialization
  2. 下载达 10 万次的 IDEA 插件,K8s 一键部署了解一下?
  3. Java_条件控制与循环控制
  4. 实用Linux控制台命令
  5. NetworkManager网络通讯_Example(一)
  6. Bran的内核开发教程(bkerndev)-07 中断描述符表(IDT)
  7. 基本的sql 语句
  8. Windows机器配置启动加载器的高级选项后,机器出现蓝屏,无法RDP
  9. ABAP中将Unicode字符串转换成中文的方法
  10. 爬虫学习--Urllib库基本使用 Day1