Spark 读取HDFS csv文件并写入hive
2024-10-19 11:55:15
package com.grady
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
/**
* csv 文件数据写入hive
*/
object CsvToHive {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val fileName = "test1.csv"
csvToHive(spark, fileName)
}
def csvToHive(spark: SparkSession, fileName: String): Unit = {
val hdfsPath = s"/tmp/jiang/${fileName}"
println(s"hdfsPath=${hdfsPath}")
import spark.implicits._
val csvRDD = spark.read
.format("csv")
.option("sep",",")
.load(hdfsPath)
.rdd
csvRDD.foreach(println)
val dataRDD = csvRDD.map(r => Row(r(0).toString.toInt, r(1), r(2), r(3).toString.toInt, r(4)))
val schema = SchemaType.getStudentSchema()
val csvDF = spark.createDataFrame(dataRDD, schema)
csvDF.write.mode(SaveMode.Overwrite)
.format("Hive")
.insertInto("jiang.student")
}
}
执行:spark-submit --master local[2] --num-executors 10 --class com.grady.CsvToHive /app/data/appdeploy/usehive1-1.0-SNAPSHOT.jar
最新文章
- iOS之常用宏定义
- 流式大数据处理的三种框架:Storm,Spark和Samza
- http://www.iis.net/downloads/microsoft/url-rewrite
- 第二十一课:js属性操作的兼容性问题
- DispatcherServlet中使用的特殊的Bean
- 【原】Kryo序列化篇
- Android中完全退出当前应用系统
- Orchard开源ASP.NET MVC CMS简介
- HDOJ 5071 Chat 模拟
- java I/O框架 (三)基本流
- 对于vxworks下硬盘驱动
- ELK学习总结(2-5)elk的版本控制
- IntelliJ IDEA部署web项目,Tomcat没有出现Artifacts
- ubuntu的磁盘扩容
- 在windows下安装、配置、运行PostgreSQL【转】
- 【bzoj 4764】弹飞大爷
- C++中extern(转)
- 使用JS调用手机本地摄像头或者相册图片识别二维码/条形码
- 078 Hbase中rowkey设计原则
- 初识Shell与Shell脚本