使用Spark读写HDFS中的parquet文件

文件夹中的parquet文件

build.sbt文件

name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hbase" % "hbase-common" % "1.3.0",
"org.apache.hbase" % "hbase-client" % "1.3.0",
"org.apache.hbase" % "hbase-server" % "1.3.0",
"org.apache.hbase" % "hbase" % "1.2.1"
)

Scala实现方法

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import java.util.Properties import com.google.common.collect.Lists
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat /**
* Created by mi on 17-4-11.
*/ case class resultset(name: String,
info: String,
summary: String) case class IntroItem(name: String, value: String) case class BaikeLocation(name: String,
url: String = "",
info: Seq[IntroItem] = Seq(),
summary: Option[String] = None) case class MewBaikeLocation(name: String,
url: String = "",
info: Option[String] = None,
summary: Option[String] = None) object MysqlOpt { def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ //定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
val table = "baike_pages" //读取parquetFile,并写入Mysql
val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()
val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow")
// parquetDF.collect().take(20).foreach(println)
//parquetDF.show() //BaikeLocation是读取的parquet文件中的case class
val ds = parquetDF.as[BaikeLocation].map { line =>
//把info转换为新的case class中的类型String
val info = line.info.map(item => item.name + ":" + item.value).mkString(",")
//注意需要把字段放在一个case class中,不然会丢失列信息
MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary)
}.cache() ds.show()
// ds.take(2).foreach(println) //写入Mysql
// val prop = new Properties()
// prop.setProperty("user", "root")
// prop.setProperty("password", "123456")
// ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop) //写入parquetFile
ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1") } }

df.show打印出来的信息,如果没放在一个case class中的话,name,url,info,summary这列信息会变成1,2,3,4

使用spark-shell查看写回去的parquet文件的信息

#进入spark-shell
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val path = "file:///home/mi/coding/coding/baikeshow_data/baikeshow1"
val df = sqlContext.parquetFile(path)
df.show
df.count

如果只想显示某一列的话,可以这么做

df.select("title").take(100).foreach(println)  //只显示title这一列的信息

最新文章

  1. Spring mvc @initBinder 类型转化器的使用
  2. Android Studio 快捷键
  3. 玩转Windows服务系列——创建Windows服务
  4. Tomcat:配置SSL
  5. TCP 状态机
  6. 在将 varchar 值 '1,2,3,4,5,6,7,8' 转换成数据类型 int 时失败。
  7. .NET 面试题整理
  8. BZOJ4373 : 算术天才⑨与等差数列
  9. hitTest:withEvent:方法(此方法可实现点击穿透、点击下层视图功能)
  10. 初入thinkphp
  11. Android 自定义 HorizontalScrollView 打造再多图片(控件)也不怕 OOM 的横向滑动效果
  12. Python3 tkinter基础 Text window 文本框中插入按钮
  13. VMWare上的ubuntu系统安装VMWare Tools(图文)
  14. ie每次登陆出现:Windows安全性 iexplore.exe 正在连接到 记住我的凭证不起作用
  15. 未能加载文件或程序集"CheckRegister"或它的某一个依赖项.参数错误. (异常来
  16. Magpie
  17. LINUX-iostat命令讲解
  18. 关于CentOS 6下Hadoop占用系统态CPU高的处理办法【转】
  19. Jmeter接口自动化测试 (四)(持续构建)
  20. Microsoft Dynamics CRM 2011 Plugin中PluginExecutionContext.InputParameters["Target"]中的Target是从哪来的?

热门文章

  1. 潭州课堂25班:Ph201805201 爬虫高级 第五课 sclapy 框架 日志和 settings 配置 模拟登录(课堂笔记)
  2. java三大特性--多态(1)
  3. elastic-job 新手指南
  4. git存储用户名与密码
  5. 【.NET线程--进阶(一)】--线程方法详解
  6. .net源码调试 http://referencesource.microsoft.com/
  7. Android性能优化-减小APK大小
  8. 通过action传过来的值在option获取进行验证
  9. zabbix监控k8s出现的pod error status
  10. PhoneGap-Android-HTML5-WebSocket