本文将分两部分介绍,第一部分讲解使用 HBase 新版 API 进行 CRUD 基本操作;第二部分讲解如何将 Spark 内的 RDDs 写入 HBase 的表中,反之,HBase 中的表又是如何以 RDDs 形式加载进 Spark 内的。

环境配置

为了避免版本不一致带来不必要的麻烦,API 和 HBase环境都是 1.0.0 版本。HBase 为单机模式,分布式模式的使用方法类似,只需要修改HBaseConfiguration的配置即可。

开发环境中使用 SBT 加载依赖项

name := "SparkLearn"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"

libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.0.0"

libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.0.0"

libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.0.0"

HBase 的 CRUD 操作

新版 API 中加入了 ConnectionHAdmin成了AdminHTable成了Table,而AdminTable只能通过Connection获得。Connection的创建是个重量级的操作,由于Connection是线程安全的,所以推荐使用单例,其工厂方法需要一个HBaseConfiguration

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "master") //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
val conn = ConnectionFactory.createConnection(conf)

创建表

使用Admin创建和删除表

val userTable = TableName.valueOf("user")

//创建 user 表
val tableDescr = new HTableDescriptor(userTable)
tableDescr.addFamily(new HColumnDescriptor("basic".getBytes))
println("Creating table `user`. ")
if (admin.tableExists(userTable)) {
admin.disableTable(userTable)
admin.deleteTable(userTable)
}
admin.createTable(tableDescr)
println("Done!")

插入、查询、扫描、删除操作

HBase 上的操作都需要先创建一个操作对象Put,Get,Delete等,然后调用Table上的相对应的方法

try{
//获取 user 表
val table = conn.getTable(userTable) try{
//准备插入一条 key 为 id001 的数据
val p = new Put("id001".getBytes)
//为put操作指定 column 和 value (以前的 put.add 方法被弃用了)
p.addColumn("basic".getBytes,"name".getBytes, "wuchong".getBytes)
//提交
table.put(p) //查询某条数据
val g = new Get("id001".getBytes)
val result = table.get(g)
val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))
println("GET id001 :"+value) //扫描数据
val s = new Scan()
s.addColumn("basic".getBytes,"name".getBytes)
val scanner = table.getScanner(s) try{
for(r <- scanner){
println("Found row: "+r)
println("Found value: "+Bytes.toString(
r.getValue("basic".getBytes,"name".getBytes)))
}
}finally {
//确保scanner关闭
scanner.close()
} //删除某条数据,操作方式与 Put 类似
val d = new Delete("id001".getBytes)
d.addColumn("basic".getBytes,"name".getBytes)
table.delete(d) }finally {
if(table != null) table.close()
} }finally {
conn.close()
}

Spark 操作 HBase

写入 HBase

首先要向 HBase 写入数据,我们需要用到PairRDDFunctions.saveAsHadoopDataset。因为 HBase 不是一个文件系统,所以saveAsHadoopFile方法没用。

def saveAsHadoopDataset(conf: JobConf): Unit
Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system

这个方法需要一个 JobConf 作为参数,类似于一个配置项,主要需要指定输出的格式和输出的表名。

Step 1:我们需要先创建一个 JobConf。

//定义 HBase 的配置
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "master") //指定输出格式和输出表名
val jobConf = new JobConf(conf,this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"user")

Step 2: RDD 到表模式的映射
在 HBase 中的表 schema 一般是这样的:

row     cf:col_1    cf:col_2

而在Spark中,我们操作的是RDD元组,比如(1,"lilei",14)(2,"hanmei",18)。我们需要将 RDD[(uid:Int, name:String, age:Int)] 转换成 RDD[(ImmutableBytesWritable, Put)]。所以,我们定义一个 convert 函数做这个转换工作

def convert(triple: (Int, String, Int)) = {
val p = new Put(Bytes.toBytes(triple._1))
p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("name"),Bytes.toBytes(triple._2))
p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("age"),Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, p)
}

Step 3: 读取RDD并转换

//read RDD data from somewhere and convert
val rawData = List((1,"lilei",14), (2,"hanmei",18), (3,"someone",38))
val localData = sc.parallelize(rawData).map(convert)

Step 4: 使用saveAsHadoopDataset方法写入HBase

localData.saveAsHadoopDataset(jobConf)

读取 HBase

Spark读取HBase,我们主要使用SparkContext 提供的newAPIHadoopRDDAPI将表的内容以 RDDs 的形式加载到 Spark 中。

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "master") //设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "user") val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]) val count = usersRDD.count()
println("Users RDD Count:" + count)
usersRDD.cache() //遍历输出
usersRDD.foreach{ case (_,result) =>
val key = Bytes.toInt(result.getRow)
val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))
val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}

最新文章

  1. php读取大文件
  2. 【转】MSM搭建(Memcached_Session_Manager)--解决集群session共享
  3. 【Leetcode】 - Divide Two Integers 位运算实现整数除法
  4. javascript google map circle radius_changed ,angularjs google map circle radius_changed
  5. android ioctl fuzz,android 本地提权漏洞 android root
  6. 获取sqlserver数据库中所有库、表、字段名的方法
  7. Serializable接口的背后
  8. c语言,链表
  9. Redis在win7上的安装与可视化应用
  10. Windows Phone获取WiFi BSSID
  11. Claris and XOR
  12. 设计模式成长记(一) 抽象工厂模式(Abstract Factory)
  13. Pycharm配置支持vue语法
  14. 003.MMM双主-双从读写分离部署
  15. socket.timeout: The read operation timed out 更改pip源至国内镜像,显著提升下载速度
  16. eclipse 对 hadoop1.2.1 hdfs 文件操作
  17. 【摘】SVN提交与版本冲突
  18. linux 驱动之LCD驱动(有framebuffer)
  19. python --help查询python相关命令
  20. EasyUI 让dialog中的treegrid的列头固定

热门文章

  1. Unity3d中使用assetbundle
  2. usb-host一步一步学(一)安卓在usb-host模式下列出当前连接的usb设备
  3. 数据库(DBUtils)
  4. hibernate课程 初探单表映射1-2 ORM定义
  5. git 如何生成 SSH 公钥
  6. webpack 安装后提示CLI
  7. UINavigationControlle 之 UINavigationBar及navigationItem关系探讨
  8. springMvc-reset风格和对静态资源的管理
  9. 碰到一个微软的bug:CWinAppEx::GetString
  10. IOS 控件器的创建方式(ViewController)