1. 现有的三方包不能完全支持
- 官方:hbase-spark,不能设置 timestamp
- unicredit/hbase-rdd:接口太复杂,不能同时支持多个 family

2. HFile 得是有序的,排序依据 KeyValue.KVComparator,于是我们自定义一个 Comparator,内部调用 KeyValue.KVComparator

3. 如果没有自定义 partitioner,极有可能出现以下异常
ERROR: "java.io.IOException: Retry attempted 10 times without completing, bailing out"
https://community.hortonworks.com/content/supportkb/150138/error-javaioioexception-retry-attempted-10-times-w.html

自定义的方法,参考了:https://github.com/unicredit/hbase-rdd/blob/master/src/main/scala/unicredit/spark/hbase/HFileSupport.scala

4. 很多博客中有以下代码,一开始理解为可以用来对 rdd 分区,实际没有用。这是 mapreduce 的 job 参数,spark中不生效
val job = Job.getInstance(hbaseConfig)
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor, regionLocator)
job.getConfiguration

其他知识点:
1. scala 中实现 serializable 接口
2. HFilePartitioner,使用 hbase 的 regionLocator.getStartKeys,将 rdd 中的 put,按 rowkey 分割成不同的 partition,每个 partition 会产生一个 hfile,对应于 hbase region 的分区

代码,以后整理:

object BulkloadHelper {
private val logger = Logger.getLogger(this.getClass) def bulkloadWrite(rdd: RDD[Put], hbaseConfig: Configuration, thisTableName: TableName): Unit = {
val hbaseConnection = ConnectionFactory.createConnection(hbaseConfig)
val regionLocator = hbaseConnection.getRegionLocator(thisTableName)
val myPartitioner = HFilePartitioner.apply(hbaseConfig, regionLocator.getStartKeys, 1) logger.info(s"regionLocator.getStartKeys.length = ${regionLocator.getStartKeys.length}")
regionLocator.getStartKeys.foreach(keys => logger.info("regionLocator.getStartKeys: " + new String(keys))) val hFilePath = getHFilePath()
logger.info(s"bulkload, begin to write to hdfs path: $hFilePath") /**
* HFile sort function -> KeyValue.KVComparator
* CellComparator
*/
rdd.flatMap(put => putToKeyValueList(put))
.map(c => (c, 1))
.repartitionAndSortWithinPartitions(myPartitioner) // repartition so each hfile can match the hbase region
.map(tuple => (new ImmutableBytesWritable(tuple._1.row), tuple._1.getKeyValue()))
.saveAsNewAPIHadoopFile(
hFilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hbaseConfig) // Bulk load Hfiles to Hbase
logger.info("bulkload, begin to load to hbase")
val bulkLoader = new LoadIncrementalHFiles(hbaseConfig)
bulkLoader.doBulkLoad(new Path(hFilePath), new HTable(hbaseConfig, thisTableName)) logger.info("bulkload, delete hdfs path")
val hadoopConf = new Configuration()
val fileSystem = FileSystem.get(hadoopConf)
fileSystem.delete(new Path(hFilePath), true)
hbaseConnection.close()
fileSystem.close()
logger.info("bulkload, done")
} def getHFilePath():String = "hdfs:///user/hadoop/hbase/bulkload/hfile/" + LocalDate.now().toString + "-" + UUID.randomUUID().toString /**
* select one keyvalue from put
* @param put
*/
def putToKeyValueList(put: Put): Seq[MyKeyValue] = {
put.getFamilyCellMap.asScala
.flatMap(_._2.asScala) // list cells
.map(cell => new MyKeyValue(put.getRow, cell.getFamily, cell.getQualifier, cell.getTimestamp, cell.getValue))
.toSeq
}
}

  

class MyKeyValue(var row: Array[Byte], var family: Array[Byte], var qualifier: Array[Byte], var timestamp: Long, var value: Array[Byte])
extends Serializable with Ordered[MyKeyValue] { import java.io.IOException
import java.io.ObjectInputStream
import java.io.ObjectOutputStream var keyValue: KeyValue = _ def getKeyValue(): KeyValue = {
if (keyValue == null) {
keyValue = new KeyValue(row, family, qualifier, timestamp, value)
}
keyValue
} @throws[IOException]
private def writeObject(out: ObjectOutputStream) {
keyValue = null
out.defaultWriteObject()
out.writeObject(this)
} @throws[IOException]
@throws[ClassNotFoundException]
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
val newKeyValue = in.readObject().asInstanceOf[MyKeyValue]
this.row = newKeyValue.row
this.family = newKeyValue.family
this.qualifier = newKeyValue.qualifier
this.timestamp = newKeyValue.timestamp
this.value = newKeyValue.value
getKeyValue()
} class MyComparator extends KeyValue.KVComparator with Serializable {}
val comparator = new MyComparator() override def compare(that: MyKeyValue): Int = {
comparator.compare(this.getKeyValue(), that.getKeyValue())
} override def toString: String = {
getKeyValue().toString
}
}

  

object HFilePartitionerHelper {
object HFilePartitioner {
def apply(conf: Configuration, splits: Array[Array[Byte]], numFilesPerRegionPerFamily: Int): HFilePartitioner = {
if (numFilesPerRegionPerFamily == 1)
new SingleHFilePartitioner(splits)
else {
val fraction = 1 max numFilesPerRegionPerFamily min conf.getInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 32)
new MultiHFilePartitioner(splits, fraction)
}
}
} protected abstract class HFilePartitioner extends Partitioner {
def extractKey(n: Any): Array[Byte] = {
// println(s"n = $n")
n match {
case kv: MyKeyValue => kv.row
}
}
} private class MultiHFilePartitioner(splits: Array[Array[Byte]], fraction: Int) extends HFilePartitioner {
override def getPartition(key: Any): Int = {
val k = extractKey(key)
val h = (k.hashCode() & Int.MaxValue) % fraction
for (i <- 1 until splits.length)
if (Bytes.compareTo(k, splits(i)) < 0) return (i - 1) * fraction + h (splits.length - 1) * fraction + h
} override def numPartitions: Int = splits.length * fraction
} private class SingleHFilePartitioner(splits: Array[Array[Byte]]) extends HFilePartitioner {
override def getPartition(key: Any): Int = {
val p = selfGetPartition(key)
// println(s"p = $p")
p
} def selfGetPartition(key: Any): Int = {
val k = extractKey(key)
for (i <- 1 until splits.length)
if (Bytes.compareTo(k, splits(i)) < 0) return i - 1 splits.length - 1
} override def numPartitions: Int = splits.length
}
}

  

最新文章

  1. 抓取网站数据不再是难事了,Fizzler(So Easy)全能搞定
  2. JS的splice()方法和slice()方法
  3. Tools - 为知笔记
  4. Eclipse字符集设置方式
  5. webform数据导出
  6. Android EditText 不弹出输入法
  7. RHEL 7特性说明(六):集群
  8. win10 uwp BadgeLogo 颜色
  9. 【C++ Primer 第13章】1. 拷贝控制、赋值和销毁
  10. PHP Manager 安装失败的解决方法, PHP Manager 1.4 for IIS 10,经验证支持windows server 2016版本
  11. grid - 它和flex布局有何区别?
  12. 懒汉处理dapper字段名与属性名的映射方式
  13. opencv3中SurfFeatureDetector、SurfDescriptorExtractor、BruteForceMatcher的使用
  14. 乌龟svn不能拉取代码的原因
  15. vs2010开发环境恢复--(mysql,数据文件可直接拷贝,并可用navicat直接管理)
  16. DataGridView使用技巧十三:点击列头实现升序和降序排序
  17. VC++ 6.0创建MFC工程时的初级备要点(二)
  18. /etc/fstab和/etc/mtab
  19. Kubernetes集群向指定节点上创建容器
  20. 得分(UVa1585)

热门文章

  1. HDU 1069 Monkey and Banana(线性DP)
  2. Jmeter_正则表达式提取器_提取数组 &amp; For Each 控制器
  3. Java方法的定义和使用
  4. 写作环境搭建(git+github+markdown+jekyll)
  5. 【JavaScript基础#2】
  6. mysql DATE_FORMAT 函数使用
  7. Redis搭建一主一从及哨兵模式(二)
  8. laravel 语言包拓展
  9. Web Storage API:localStorage 和 SessionStorage
  10. 【Python】【Django】用户注册功能