hbase本身提供了 聚合方法可以服务端聚合操作

hbase中的CoprocessorProtocol机制.

CoprocessorProtocol的原理比较简单,近似于一个mapreduce框架。由client将scan分解为面向多个region的请求,并行发送请求到多个region,然后client做一个reduce的操作,得到最后的结果。

先看一个例子,使用hbase的AggregationClient可以做到简单的面向单个column的统计。

  1. @Test
  2. public void testAggregationClient() throws Throwable {
  3. LongColumnInterpreter columnInterpreter = new LongColumnInterpreter();
  4. AggregationClient aggregationClient = new AggregationClient(
  5. CommonConfig.getConfiguration());
  6. Scan scan = new Scan();
  7. scan.addColumn(ColumnFamilyName, QName1);
  8. Long max = aggregationClient.max(TableNameBytes, columnInterpreter,
  9. scan);
  10. Assert.assertTrue(max.longValue() == 100);
  11. Long min = aggregationClient.min(TableNameBytes, columnInterpreter,
  12. scan);
  13. Assert.assertTrue(min.longValue() == 20);
  14. Long sum = aggregationClient.sum(TableNameBytes, columnInterpreter,
  15. scan);
  16. Assert.assertTrue(sum.longValue() == 120);
  17. Long count = aggregationClient.rowCount(TableNameBytes,
  18. columnInterpreter, scan);
  19. Assert.assertTrue(count.longValue() == 4);
  20. }

看下hbase的源码。AggregateImplementation

  1. @Override
  2. public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)
  3. throws IOException {
  4. T temp;
  5. T max = null;
  6. InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
  7. .getRegion().getScanner(scan);
  8. List<KeyValue> results = new ArrayList<KeyValue>();
  9. byte[] colFamily = scan.getFamilies()[0];
  10. byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
  11. // qualifier can be null.
  12. try {
  13. boolean hasMoreRows = false;
  14. do {
  15. hasMoreRows = scanner.next(results);
  16. for (KeyValue kv : results) {
  17. temp = ci.getValue(colFamily, qualifier, kv);
  18. max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
  19. }
  20. results.clear();
  21. } while (hasMoreRows);
  22. } finally {
  23. scanner.close();
  24. }
  25. log.info("Maximum from this region is "
  26. + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
  27. .getRegionNameAsString() + ": " + max);
  28. return max;
  29. }

这里由于

  1. byte[] colFamily = scan.getFamilies()[0];
  2. byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();

所以,hbase自带的Aggregate函数,只能面向单列进行统计。

当我们想对多列进行Aggregate,并同时进行countRow时,有以下选择。 
1 scan出所有的row,程序自己进行Aggregate和count。 
2 使用AggregationClient,调用多次,得到所有的结果。由于多次调用,有一致性问题。 
3 自己扩展CoprocessorProtocol。

这个是github的hbase集成插件

这个功能集成到simplehbase里面了。
https://github.com/zhang-xzhi/simplehbase

最新文章

  1. PHP explode()函数
  2. linux socket连接中 ERRNO错误
  3. [UCSD白板题] Fractional Knapsack
  4. 机器学习之神经网络模型-上(Neural Networks: Representation)
  5. C# 多线程 方法,类的标记
  6. LoadRunner的场景设置
  7. HDU_1245_Saving James Bond_最短路
  8. 非root启动80端口
  9. freemarker报错之十
  10. pip install PIL The _imagingft C module is not installed
  11. 无服务器架构(Faas/Serverless)
  12. datetime模块
  13. [翻译] Visual Studio 2019 RC版发布
  14. git 远程删除文件
  15. macos 下安装brew
  16. Python3 函数注解
  17. JVM 堆内存和非堆内存
  18. System.Diagnostics.Debug和System.Diagnostics.Trace
  19. iOS 里RGB 配色 UIColor colorWithRed
  20. bootstrapValidator验证中Maximum call stack size exceeded

热门文章

  1. &lt;Python基础&gt;字典的基本操作
  2. NTT FWT(xor or and) 模板
  3. arp协议简单介绍
  4. Pthread spinlock自旋锁
  5. minutia cylinder code MCC lSSR 匹配算法
  6. HTML和css简单日常总结
  7. bzoj1913: [Apio2010]signaling 信号覆盖
  8. js声明变量的三种方式
  9. 清空标签间的内容(innerHTML)和 value
  10. System.Web.Mvc.HttpStatusCodeResult.cs