HBase_HBase的runsql协处理器实战与sql聚合计算原理
一、runsql客户端实战
1.通过sql获取返回结果的统一方法
在该方法内部解析处sql中的各个子句字段:
1 | public ResultSet getResultSet(String area) throws Throwable{ |
2.客户端调用协处理器方法
此处选择最复杂的aggregateQuery作为案例:
1)按定义好的protobuf协议,组装好协处理器的Call对象实例;
2)HBase提供的协处理器调用接口coprocessorService()有两个重载方法:一个不需要传入merger类对象,该方法会将不同RS节点上的处理结果以list的形式返回;一个需要传入merger类对象,会在返回结果时将所有RS节点上处理结果传输到一个节点上,按merger对象中的逻辑进行数据合并。如下代码中采用了第二种方法,创建了一个merger类,实现了不同RS节点处理结果的合并逻辑。
1 | public ResultSet aggregateQuery(final byte[] start, final byte[] end, |
二、sql聚合计算原理
runsql协处理器中实现了select、where、aggSel、groupby、having、orderby、limit、distinct这些子句,也实现了sum、avg、count、max、min这些聚合函数。
1.使用RegionScanner读取目标列
使用RegionScanner根据Call对象中的startkey和endkey读取范围rowkey,解析传入的sql子句,过滤获取所有需要用到的列:
1 |
|
2.逐行进行数据处理
对每一行数据按步骤进行聚合计算处理:
1)添加聚合列,对于aggSel、orderby、having子句中,带有sum、count、max、min聚合函数的指标字段都需要新增一列,对于avg则需要新增一个sum列和一个count列;
2)对于groupby子句,需要添加一个所有groupby字段值组成的一个用于去重的特殊agg标识列;
3)对于distinct子句,也需要添加一个所有distinct字段值组成的一个用于去重的特殊distinct标识列;
4)将该行数据添加到结果集中并进行计算,根据agg字段判断是否新增,不新增则对不同聚合函数进行合并计算,新增则根据orderby子句或distinct字段按二分法找到插入位置(协处理器方法的最后一个入参merger类对象中的不同RS计算结果merge方法,应该是与该处一样的处理逻辑)。
1 | public void internalAggQuery(List<Aggregate> aggColumns, |
3.添加聚合列
对sum、count、max、min聚合函数字段新增一个对应结果存储列;对于avg函数字段新增两个结果暂存列,一个sum列和一个count列:
1 | /*** |
4.添加groupby标识列
将所有groupby字段值组成的一个用于去重标识列:
1 | if (groupbys != null) { |
5.添加distinct标识列
将所有distinct字段值组成的一个用于去重计数标识列:
1 | if (isDistinct) { |
6.去重与聚合计算
1)检查结果集中groupby去重标识列值是否已存在,已存在则需要将该行数据按照聚合计算逻辑merge到已存在行中;不存在则直接将该行插入到结果集中,对于存在orderby或distinct子句的查询则需要根据二分法找到插入位置:
1 | public synchronized void accumulate_VersionID_1(Row currentRow) { |
1 | public void accumulate(Column col) { |