网站推广四个阶段,保康网站建设,宜昌网站制作公司亿腾,哈尔滨线下教学最新情况背景#xff1a; 对于其他数据存储系统来说#xff0c;统计表的行数是再基本不过的操作了#xff0c;一般实现都非常简单#xff1b;但对于HBase这种key-value存储结构的列式数据库#xff0c;统计 RowCount 的方法却有好几种不同的花样#xff0c;并且执行效率差别巨大 对于其他数据存储系统来说统计表的行数是再基本不过的操作了一般实现都非常简单但对于HBase这种key-value存储结构的列式数据库统计 RowCount 的方法却有好几种不同的花样并且执行效率差别巨大下面来研究下吧~ 测试集群HBase1.2.0 - CDH5.13.0 四台服务器 注以下4种方法效率依次提高 一、hbase-shell的count命令 这是最简单直接的操作但是执行效率非常低适用于百万级以下的小表RowCount统计 hbase count ns1:t1hbase count t1hbase count t1, INTERVAL 100000hbase count t1, CACHE 1000hbase count t1, INTERVAL 10, CACHE 1000此操作可能需要很长时间来运行计数MapReduce作业。默认情况下每1000行显示当前计数计数间隔可自行指定。 默认情况下在计数扫描上启用缓存默认缓存大小为10行。 行数为 3000W 的表测试结果 hbase(main):001:0 count sda_crm_calls20180102默认INTERVAL为1000行时花了80分钟。。 hbase(main):001:0 count sda_crm_calls20180102, INTERVAL 1000000INTERVAL为1000000行时花了130分钟。。 二、scan方式设置过滤器循环计数JAVA实现 这种方式是通过添加 FirstKeyOnlyFilter 过滤器的scan进行全表扫描循环计数RowCount速度较慢 但快于第一种count方式 基本代码如下 public void rowCountByScanFilter(String tablename){long rowCount 0;try {//计时StopWatch stopWatch new StopWatch();stopWatch.start();TableName nameTableName.valueOf(tablename);//connection为类静态变量Table table connection.getTable(name);Scan scan new Scan();//FirstKeyOnlyFilter只会取得每行数据的第一个kv提高count速度scan.setFilter(new FirstKeyOnlyFilter());ResultScanner rs table.getScanner(scan);for (Result result : rs) {rowCount result.size();}stopWatch.stop();System.out.println(RowCount: rowCount);System.out.println(统计耗时 stopWatch.getTotalTimeMillis());} catch (Throwable e) {e.printStackTrace();}
}耗时45分钟 三、利用hbase.RowCounter包执行MR任务
这种方式效率非常高利用了hbase jar中自带的统计行数的工具类
通过 $HBASE_HOME/bin/hbase 命令执行
[rootcdh1 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter sda_crm_calls20180102耗时1m40s速度较上面两种有了质的飞跃 四、利用HBase协处理器CoprocessorJAVA实现
这是我目前发现效率最高的RowCount统计方式利用了HBase高级特性协处理器
我们往往使用过滤器来减少服务器端通过网络返回到客户端的数据量。但HBase中还有一些特性让用户甚至可以把一部分计算也移动到数据的存放端那就是协处理器 coprocessor。
协处理器简介
节选自《HBase权威指南》
使用客户端API配合筛选机制例如使用过滤器或限制列族的范围都可以控制被返回到客户端的数据量。如果可以更进一步优化会更好例如数据的处理流程直接放到服务器端执行然后仅返回一个小的处理结果集。这类似于一个小型的MapReduce框架该框架将工作分发到整个集群。
协处理器 允许用户在region服务器上运行自己的代码更准确地说是允许用户执行region级的操作并且可以使用与RDBMS中触发器trigger类似的功能。在客户端用户不用关心操作具体在哪里执行HBase的分布式框架会帮助用户把这些工作变得透明。
实现代码
public void rowCountByCoprocessor(String tablename){try {//提前创建connection和confAdmin admin connection.getAdmin();TableName nameTableName.valueOf(tablename);//先disable表添加协处理器后再enable表admin.disableTable(name);HTableDescriptor descriptor admin.getTableDescriptor(name);String coprocessorClass org.apache.hadoop.hbase.coprocessor.AggregateImplementation;if (! descriptor.hasCoprocessor(coprocessorClass)) {descriptor.addCoprocessor(coprocessorClass);}admin.modifyTable(name, descriptor);admin.enableTable(name);//计时StopWatch stopWatch new StopWatch();stopWatch.start();Scan scan new Scan();AggregationClient aggregationClient new AggregationClient(conf);System.out.println(RowCount: aggregationClient.rowCount(name, new LongColumnInterpreter(), scan));stopWatch.stop();System.out.println(统计耗时 stopWatch.getTotalTimeMillis());} catch (Throwable e) {e.printStackTrace();}
} 发现只花了 23秒 就统计完成
为什么利用协处理器后速度会如此之快
Table注册了Coprocessor之后在执行AggregationClient的时候会将RowCount分散到Table的每一个Region上Region内RowCount的计算是通过RPC执行调用接口由Region对应的RegionServer执行InternalScanner进行的。
因此性能的提升有两点原因:
1.分布式统计。将原来客户端按照Rowkey的范围单点进行扫描然后统计的方式换成了由所有Region所在RegionServer同时计算的过程。
2.使用了在RegionServer内部执行使用了InternalScanner。这是距离实际存储最近的Scanner接口存取更加快捷。