数据源-基本操作load和save

object BasicTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("BasicTest")
.master("local")
.getOrCreate() //最基本的读取(load)和保存(write)操作,操作的文件的数据格式默认是parquet
val sessionDF = spark.read.load(s"${BASE_PATH}/trackerSession")
sessionDF.show() sessionDF.select("ip", "cookie").write.save(s"${BASE_PATH}/trackerSession_ip_cookie") //可以读取多个文件目录下的数据文件
val multiSessionDF = spark.read.load(s"${BASE_PATH}/trackerSession",
s"${BASE_PATH}/trackerSession_ip_cookie")
multiSessionDF.show() //读取的时候指定schema
val schema = StructType(StructField("ip", StringType) :: Nil)
val specSessionDF = spark.read.schema(schema).load(s"${BASE_PATH}/trackerSession")
specSessionDF.show() //指定数据源数据格式
//读取json文件, 且将读取出来的数据保存为parquet文件
val deviceInfoDF = spark.read.format("json").load(s"${BASE_PATH}/IoT_device_info.json")
spark.read.json(s"${BASE_PATH}/IoT_device_info.json").show() deviceInfoDF.write.format("orc").save(s"${BASE_PATH}/iot")
deviceInfoDF.write.orc(s"${BASE_PATH}/iot2") //option传递参数,改变读写数据源的行为
spark.read.option("mergeSchema", "true").parquet(s"${BASE_PATH}/trackerSession")
deviceInfoDF.write.option("compression", "snappy").parquet(s"${BASE_PATH}/iot2_parquet") val optsMap = Map("mergeSchema" -> "mergeSchema")
spark.read.options(optsMap).parquet("") //SaveMode
//SaveMode.ErrorIfExists(对应着字符串"error"):表示如果目标文件目录中数据已经存在了,则抛异常(这个是默认的配置)
//SaveMode.Append(对应着字符串"append"):表示如果目标文件目录中数据已经存在了,则将数据追加到目标文件中
//SaveMode.Overwrite(对应着字符串"overwrite"):表示如果目标文件目录中数据已经存在了,则用需要保存的数据覆盖掉已经存在的数据
//SaveMode.Ignore(对应着字符串为:"ignore"):表示如果目标文件目录中数据已经存在了,则不做任何操作 deviceInfoDF.write.option("compression", "snappy").mode(SaveMode.Ignore).parquet(s"${BASE_PATH}/iot/iot2_parquet")
spark.read.parquet(s"${BASE_PATH}/iot/iot2_parquet").show()
deviceInfoDF.write.option("compression", "snappy").mode("ignore").parquet(s"${BASE_PATH}/iot/iot2_parquet") spark.stop()
}
}

  

最新文章

  1. [MFC] VS2013版本MFC工程移植到VC6.0上
  2. asp.net 的page 基类页面 做一些判断 可以定义一个基类页面 继承Page类 然后重写OnPreLoad事件
  3. perl基本语法--转载
  4. 《Play for Java》学习笔记(四)Controller
  5. C语言每日一题之No.8
  6. MySQL CAST与CONVERT 函数的用法
  7. Library:python-memcached on Windows
  8. [Oracle] 常用工具集之 - SQL*Loader
  9. PHP中使用 $_GET 与$_POST 实现简单的前后台数据传输交互功能
  10. GridView中使用 jQuery DatePicker (UpdatePanel)
  11. Flask框架基础--第一篇
  12. 2.8 break和continue
  13. JDK源码分析(6)ConcurrentHashMap
  14. 如何把大段文字转为带html标签的文字
  15. axios全局设置url公共请求头
  16. python第二天 python介绍与变量
  17. java虚拟机的原理
  18. Spring事务传播属性介绍(一).required 和 reuqires_new
  19. R语言进行数据预处理
  20. GNU C 中零长度的数组【转】

热门文章

  1. cloudera cdh6.3 离线安装 经典大数据平台视频教程(含网盘下载地址)
  2. 用LabVIEW做声源定位系统
  3. 记一次奇怪的python多个变量拼接后的字符串丢失事件
  4. The One day 中位数的计算
  5. HttpClient参观记:.net core 2.2 对HttpClient到底做了神马
  6. ZooKeeper系列(四)—— Java 客户端 Apache Curator
  7. C# 的ToString 常用方法
  8. SOFT-NMS (二) (non maximum suppression,非极大值抑制)
  9. 嵌入式应用开发第四阶段-基于rk3399的视频监控系统
  10. 如何在SAP Cloud Platform上进行第一个integration flow开发