hbase 聚合操作
2024-09-02 17:39:17
hbase本身提供了 聚合方法可以服务端聚合操作
hbase中的CoprocessorProtocol机制.
CoprocessorProtocol的原理比较简单,近似于一个mapreduce框架。由client将scan分解为面向多个region的请求,并行发送请求到多个region,然后client做一个reduce的操作,得到最后的结果。
先看一个例子,使用hbase的AggregationClient可以做到简单的面向单个column的统计。
- @Test
- public void testAggregationClient() throws Throwable {
- LongColumnInterpreter columnInterpreter = new LongColumnInterpreter();
- AggregationClient aggregationClient = new AggregationClient(
- CommonConfig.getConfiguration());
- Scan scan = new Scan();
- scan.addColumn(ColumnFamilyName, QName1);
- Long max = aggregationClient.max(TableNameBytes, columnInterpreter,
- scan);
- Assert.assertTrue(max.longValue() == 100);
- Long min = aggregationClient.min(TableNameBytes, columnInterpreter,
- scan);
- Assert.assertTrue(min.longValue() == 20);
- Long sum = aggregationClient.sum(TableNameBytes, columnInterpreter,
- scan);
- Assert.assertTrue(sum.longValue() == 120);
- Long count = aggregationClient.rowCount(TableNameBytes,
- columnInterpreter, scan);
- Assert.assertTrue(count.longValue() == 4);
- }
看下hbase的源码。AggregateImplementation
- @Override
- public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)
- throws IOException {
- T temp;
- T max = null;
- InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
- .getRegion().getScanner(scan);
- List<KeyValue> results = new ArrayList<KeyValue>();
- byte[] colFamily = scan.getFamilies()[0];
- byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
- // qualifier can be null.
- try {
- boolean hasMoreRows = false;
- do {
- hasMoreRows = scanner.next(results);
- for (KeyValue kv : results) {
- temp = ci.getValue(colFamily, qualifier, kv);
- max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
- }
- results.clear();
- } while (hasMoreRows);
- } finally {
- scanner.close();
- }
- log.info("Maximum from this region is "
- + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
- .getRegionNameAsString() + ": " + max);
- return max;
- }
这里由于
- byte[] colFamily = scan.getFamilies()[0];
- byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
所以,hbase自带的Aggregate函数,只能面向单列进行统计。
当我们想对多列进行Aggregate,并同时进行countRow时,有以下选择。
1 scan出所有的row,程序自己进行Aggregate和count。
2 使用AggregationClient,调用多次,得到所有的结果。由于多次调用,有一致性问题。
3 自己扩展CoprocessorProtocol。
这个是github的hbase集成插件
这个功能集成到simplehbase里面了。
https://github.com/zhang-xzhi/simplehbase
最新文章
- PHP explode()函数
- linux socket连接中 ERRNO错误
- [UCSD白板题] Fractional Knapsack
- 机器学习之神经网络模型-上(Neural Networks: Representation)
- C# 多线程 方法,类的标记
- LoadRunner的场景设置
- HDU_1245_Saving James Bond_最短路
- 非root启动80端口
- freemarker报错之十
- pip install PIL The _imagingft C module is not installed
- 无服务器架构(Faas/Serverless)
- datetime模块
- [翻译] Visual Studio 2019 RC版发布
- git 远程删除文件
- macos 下安装brew
- Python3 函数注解
- JVM 堆内存和非堆内存
- System.Diagnostics.Debug和System.Diagnostics.Trace
- iOS 里RGB 配色 UIColor colorWithRed
- bootstrapValidator验证中Maximum call stack size exceeded
热门文章
- <;Python基础>;字典的基本操作
- NTT FWT(xor or and) 模板
- arp协议简单介绍
- Pthread spinlock自旋锁
- minutia cylinder code MCC lSSR 匹配算法
- HTML和css简单日常总结
- bzoj1913: [Apio2010]signaling 信号覆盖
- js声明变量的三种方式
- 清空标签间的内容(innerHTML)和 value
- System.Web.Mvc.HttpStatusCodeResult.cs