HBase_HBase的runsql协处理器实战与sql聚合计算原理

HBase_HBase的runsql协处理器实战与sql聚合计算原理

一、runsql客户端实战

1.通过sql获取返回结果的统一方法

在该方法内部解析处sql中的各个子句字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public ResultSet getResultSet(String area) throws Throwable{
if(sql==null){
LOG.error("havn't set the sql sentence,the sql is:"+sql);
throw new RuntimeException("sql is "+sql);
}
try {
if(tableName==null){
ParseElem(GetSqlElem(sql));
}
if (null == area || area.trim().equals("")){
client = ClientUtils.intance(tableName);
} else {
client = ClientUtils.intance(area,tableName);
}
if(!CONTAIN_START&&startKey!=new byte[0]){
startKey=CoEngin.getNextByte(startKey);
}
if(CONTAIN_END&&endKey!=new byte[0]){
endKey=CoEngin.getNextByte(endKey);
}
LOG.info(String.format("Runsql execute startKey:[%s] endKey :[%s] sql:%s",startKeyStr,endKeyStr,sql));
if(IS_AGG){
return client.aggregateQuery(startKey, endKey, aggsel, tableName, where, groupby, having, orderby, TopN);
}else{
if(orderby==null&&(!HAS_CAL)&&(!IS_DISTINCT)){
if(where==null){
return client.scan(tableName, startKey, endKey,null, select,TopN);
}
}
return client.simpleQuery(startKey, endKey, select, tableName, where, orderby, TopN);
}
} catch (Throwable e) {
LOG.error("unsuccess result: "+e);
throw e;
}
}

2.客户端调用协处理器方法

此处选择最复杂的aggregateQuery作为案例:

1)按定义好的protobuf协议,组装好协处理器的Call对象实例;

2)HBase提供的协处理器调用接口coprocessorService()有两个重载方法:一个不需要传入merger类对象,该方法会将不同RS节点上的处理结果以list的形式返回;一个需要传入merger类对象,会在返回结果时将所有RS节点上处理结果传输到一个节点上,按merger对象中的逻辑进行数据合并。如下代码中采用了第二种方法,创建了一个merger类,实现了不同RS节点处理结果的合并逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public ResultSet aggregateQuery(final byte[] start, final byte[] end,
final AggSelect aggSelect, final String fromTable,
final Where whereCondition, // where
final GroupBy groupbys, // group by
final Having having, List<OrderBy> orderbys, final int top)
throws Throwable {
MetaData tmeta = getMetaInfoFromCM(fromTable);
if (tmeta == null) {
tmeta=getMetaInfo(fromTable);
}
if (aggSelect.isDistinct()) {// add by xinrong for distinct
orderbys = new ArrayList<OrderBy>();
for (Aggregate col : aggSelect.getColumns()) {
orderbys.add(asc(col));
}
}
final List<OrderBy> finalOrderBys = null == orderbys ? new ArrayList<OrderBy>() : orderbys;
final MetaData meta = tmeta;
if (!validate(null, aggSelect, whereCondition, groupbys, having,
orderbys, meta)) {
LOG.error("vadliation failed");
return null;
}

Table table = null;
try {
table = pool.getTable(fromTable);
} catch (IOException e) {
e.printStackTrace();
}

Batch.Call<QueryProto.QueryService, QueryProto.ResultSet> callable =
new Batch.Call<QueryProto.QueryService, QueryProto.ResultSet>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<QueryProto.ResultSet> rpcCallback =
new BlockingRpcCallback<QueryProto.ResultSet>();
//下面重载 call 方法
@Override
public QueryProto.ResultSet call(QueryProto.QueryService instance) throws IOException {
QueryProto.AggregateRequest.Builder builder = QueryProto.AggregateRequest.newBuilder();
builder.setStartKey(ByteString.copyFrom(start));
builder.setEndKey(ByteString.copyFrom(end));
builder.setAggSelect(aggSelect.getProto());
builder.setTableName(fromTable);


if(null != whereCondition)
builder.setWhere(whereCondition.getProto());
if(null != groupbys)
builder.setGroupBy(groupbys.getProto());
if(null != having)
builder.setHaving(having.getProto());
if(null != finalOrderBys)
for(OrderBy orderBy : finalOrderBys) {
builder.addOrderbys(orderBy.getProto());
}

builder.setMeta(meta.getProto());

//RPC 调用
instance.aggregateQuery(controller, builder.build(), rpcCallback);
if(controller.failed())
System.out.println(controller.errorText());
return rpcCallback.get();
}
};

class Accumulator implements Batch.Callback<QueryProto.ResultSet> {

private ResultSet finalRS;

public Accumulator() {
finalRS = new ResultSet();
}

@Override
public void update(byte[] region, byte[] row, QueryProto.ResultSet result) {
ResultSet tempRS = new ResultSet();
if (result != null)
tempRS.setProto(result);
for (int i = 0; i < tempRS.size(); i++)
finalRS.accumulate(tempRS.get(i));
}

public ResultSet getResult() {
return finalRS;
}
}

Accumulator accumulator = new Accumulator();

table.coprocessorService(QueryProto.QueryService.class, start, end, callable, accumulator);
ResultSet result = accumulator.getResult();
table.close();
doAverageAndCaculation(result, aggSelect);
return doHavingAndOrderBy(result, having, finalOrderBys, top);
}

二、sql聚合计算原理

runsql协处理器中实现了select、where、aggSel、groupby、having、orderby、limit、distinct这些子句,也实现了sum、avg、count、max、min这些聚合函数。

1.使用RegionScanner读取目标列

使用RegionScanner根据Call对象中的startkey和endkey读取范围rowkey,解析传入的sql子句,过滤获取所有需要用到的列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Override
public void aggregateQuery(RpcController controller, QueryProto.AggregateRequest request, RpcCallback<QueryProto.ResultSet> done) {
byte[] start = request.getStartKey().toByteArray();
byte[] end = request.getEndKey().toByteArray();

AggSelect aggSelect = new AggSelect();
aggSelect.setProto(request.getAggSelect());
Where whereCondition = null;
if(request.getWhere().isInitialized()) {
whereCondition = new Where();
whereCondition.setProto(request.getWhere());
}
GroupBy groupbys = null;
if(request.getGroupBy().isInitialized()) {
groupbys = new GroupBy();
groupbys.setProto(request.getGroupBy());
}
Having having = null;
if(request.getHaving().isInitialized()) {
having = new Having();
having.setProto(request.getHaving());
}
List<OrderBy> orderbys = new ArrayList<>();
for(QueryProto.OrderBy orderByProto : request.getOrderbysList()) {
OrderBy orderBy = new OrderBy();
orderBy.setProto(orderByProto);
orderbys.add(orderBy);
}
String fromTable = request.getTableName();

MetaData meta = new MetaData();
meta.setProto(request.getMeta());

QueryProto.ResultSet response = null;
long time=System.currentTimeMillis();////////////////////////////////////////

RegionScanner scanner = null;
ResultSet rs = new ResultSet();
try {
Scan scan = new Scan();
scan.setMaxVersions(1);
scan.setStartRow(start);
scan.setStopRow(end);
// prepare scan object
List<String> colNames = Parser.extractColumnNames(null, aggSelect, whereCondition,
groupbys, having, orderbys);
if(colNames.size()==0){
colNames.addAll(meta.getColumns());
}

try {
for (int i = 0; i < colNames.size(); ++i) {
String tmp[] = colNames.get(i).split(":");
scan.addColumn(Bytes.toBytes(tmp[0]), Bytes.toBytes(tmp[1]));
}
} catch (Exception e){
StringBuffer sb = new StringBuffer();
sb.append(aggSelect).append("\n");
sb.append(whereCondition).append("\n");
sb.append(groupbys).append("\n");
sb.append(having).append("\n");
sb.append(orderbys).append("\n");
throw new RuntimeException("server error log: " + aggSelect);
}

scanner = env.getRegion().getScanner(scan);
IReader reader = new RegionScannerReader(scanner, meta, null,colNames);
internalAggQuery(aggSelect.getColumns(), fromTable, whereCondition, groupbys, having, orderbys, meta,
rs, reader,aggSelect.isDistinct());
response = rs.getProto();

} catch (IOException e) {

} finally {
if (scanner != null)
try {
scanner.close();
} catch (IOException e) {
LOG.error(e.getMessage(),e);
}
}
if(LOG.isDebugEnabled()){
LOG.debug("query use time :"+(System.currentTimeMillis()-time));/////////////////////
}
done.run(response); //Protobuf 的返回
}

2.逐行进行数据处理

对每一行数据按步骤进行聚合计算处理:

1)添加聚合列,对于aggSel、orderby、having子句中,带有sum、count、max、min聚合函数的指标字段都需要新增一列,对于avg则需要新增一个sum列和一个count列;

2)对于groupby子句,需要添加一个所有groupby字段值组成的一个用于去重的特殊agg标识列;

3)对于distinct子句,也需要添加一个所有distinct字段值组成的一个用于去重的特殊distinct标识列;

4)将该行数据添加到结果集中并进行计算,根据agg字段判断是否新增,不新增则对不同聚合函数进行合并计算,新增则根据orderby子句或distinct字段按二分法找到插入位置(协处理器方法的最后一个入参merger类对象中的不同RS计算结果merge方法,应该是与该处一样的处理逻辑)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void internalAggQuery(List<Aggregate> aggColumns,
String fromTable,
Where whereCondition,
GroupBy groupbys,
Having having,
List<OrderBy> orderbys,
MetaData meta,
ResultSet rs, IReader reader,boolean isDistinct) throws IOException {

do {
Row currentRow = reader.next();

if (currentRow == null) break;
//process where condition
if (Matcher.accept(whereCondition, currentRow)) {
//generate aggregate columns and put them into current row
for (int i=0;i<aggColumns.size();++i) {
Aggregate aggrgt = aggColumns.get(i);
generateAggregate(currentRow, aggrgt);
}

if (orderbys.size() > 0)
for (int i=0;i<orderbys.size();++i) {
//generate aggregate column
OrderBy ob = orderbys.get(i);
if (ob.getAggregate() != null) {
generateAggregate(currentRow, ob.getAggregate());
}
}
generateAggregateFromHaving(currentRow, having);

if (groupbys != null) {
//splice group by columns
byte[] buff = new byte[0];
List<String> grpColumns = groupbys.getColumns();
for (int i=0;i<grpColumns.size();++i) {
buff = Bytes.add(buff, currentRow.getColumn(grpColumns.get(i)).getValue());
}
currentRow.addColumn(new Column(Column.SPLICE_KEY_COL_NAME,
buff, Column.DataType.AGGKEY));
}

//add by xinrong on 2012.11.5
if (isDistinct) {
//splice orderby(the same as aggsel) column by columns
byte[] buff = new byte[0];
for (int i=0;i<orderbys.size();++i) {
buff = Bytes.add(buff, currentRow.getColumn(orderbys.get(i).getAggregate().getFakeName()).getValue());
}
currentRow.addColumn(new Column(Column.DISTINCT_COL,
buff, Column.DataType.DTCKEY));
}

rs.accumulate(currentRow);
}
} while (true);
}

3.添加聚合列

对sum、count、max、min聚合函数字段新增一个对应结果存储列;对于avg函数字段新增两个结果暂存列,一个sum列和一个count列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/***
* 在当前行上加入聚合列
* @param currentRow
* @param aggrgt
*/
private void generateAggregate(Row currentRow, Aggregate aggrgt) {
if (currentRow == null || aggrgt == null) return ;
Column original = currentRow.getColumn(aggrgt.getColName());
if (original == null) {
return;
}
//generate aggregate columns
String fakeName = aggrgt.getFakeName();
if (!currentRow.hasColumn(fakeName)) {
if (aggrgt.getType() == Expression.AGG_AVG ) {
//if this is an average column, we need to add a count column
//and a sum column because avg=sum/count,
if (!currentRow.hasColumn(Aggregate.PREFIX_SUM+aggrgt.getColName())) {
Column newAggCol=new Column(Aggregate.PREFIX_SUM+aggrgt.getColName() ,
original.getValue(), original.getType());
newAggCol.setAggType(Aggregate.AGG_SUM);
currentRow.addColumn(newAggCol);
}
if (!currentRow.hasColumn(Aggregate.PREFIX_COUNT)) {
Column newAggCol = new Column(Aggregate.PREFIX_COUNT ,
original.getValue(), original.getType());
newAggCol.setAggType(Aggregate.AGG_COUNT);
currentRow.addColumn(newAggCol);
}
}
Column newAggCol = new Column(fakeName,
original.getValue(), original.getType());
newAggCol.setAggType(aggrgt.getType());
currentRow.addColumn(newAggCol);
}
}

4.添加groupby标识列

将所有groupby字段值组成的一个用于去重标识列:

1
2
3
4
5
6
7
8
9
10
if (groupbys != null) {
//splice group by columns
byte[] buff = new byte[0];
List<String> grpColumns = groupbys.getColumns();
for (int i=0;i<grpColumns.size();++i) {
buff = Bytes.add(buff, currentRow.getColumn(grpColumns.get(i)).getValue());
}
currentRow.addColumn(new Column(Column.SPLICE_KEY_COL_NAME,
buff, Column.DataType.AGGKEY));
}

5.添加distinct标识列

将所有distinct字段值组成的一个用于去重计数标识列:

1
2
3
4
5
6
7
8
9
if (isDistinct) {
//splice orderby(the same as aggsel) column by columns
byte[] buff = new byte[0];
for (int i=0;i<orderbys.size();++i) {
buff = Bytes.add(buff, currentRow.getColumn(orderbys.get(i).getAggregate().getFakeName()).getValue());
}
currentRow.addColumn(new Column(Column.DISTINCT_COL,
buff, Column.DataType.DTCKEY));
}

6.去重与聚合计算

1)检查结果集中groupby去重标识列值是否已存在,已存在则需要将该行数据按照聚合计算逻辑merge到已存在行中;不存在则直接将该行插入到结果集中,对于存在orderby或distinct子句的查询则需要根据二分法找到插入位置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public synchronized void accumulate_VersionID_1(Row currentRow) {

Column col = currentRow.getColumn(Column.SPLICE_KEY_COL_NAME);
if(LOG.isDebugEnabled()){
LOG.debug("accumulate the row: "+currentRow.getKeyString());
}
if (col!=null) {
int hashKey = Bytes.hashCode(MD5Key(col.getValue()));
//find the value in the rows;
if(LOG.isDebugEnabled()){
LOG.debug("current group by key: " + hashKey);
}
if (hasAggValue(hashKey)) {
Integer pos = getAggValuePosition(hashKey);
//fetch each aggregate column, do accumulation
Row saveTo = get(pos);
List<String> aggColumns = currentRow.getAggColumns();
for (int i=0;i<aggColumns.size();++i) {
String name = aggColumns.get(i);
saveTo.getColumn(name).accumulate(
currentRow.getColumn(name));
}
} else {
if(LOG.isDebugEnabled()){
LOG.debug("add a group by key: " + hashKey);
}
//create revert index and save column
aggvalue.put(hashKey, size());
add(currentRow);
}
} else {
//no group by columns, aggregate all to one row;
if (rows.size() == 0) {
add(currentRow);
} else {
List<String> aggColumns = currentRow.getAggColumns();
Row saveTo = rows.get(0);
//skip the non-aggregate columns
for (int i=0;i<aggColumns.size();++i) {
String name = aggColumns.get(i);
if (saveTo.getColumn(name)!=null)
saveTo.getColumn(name).accumulate(
currentRow.getColumn(name));
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
	public void accumulate(Column col) {
if (col == null) return ;
switch (aggType) {
// case Expression.AGG_AVG:
// //do nothing
// break;
case Expression.AGG_COUNT:
value = internalAdd(value, col.getValue(), type);
break;
case Expression.AGG_MAX:
if (Bytes.compareTo(value, col.getValue()) < 0)
value = col.getValue();
break;
case Expression.AGG_MIN:
if ( Bytes.compareTo(value, col.getValue()) > 0)
value = col.getValue();
break;
case Expression.AGG_SUM:
value = internalAdd(value, col.getValue(), type);
break;
}
}