HBase_HBase表行数统计实战

HBase_HBase表行数统计实战

一、HBase统计表行数的常见四种方法

1.hbase-shell的count命令

这是最简单直接的操作,但是执行效率非常低,适用于百万级以下的小表RowCount统计。本质就是scan,扫描3000w行数据耗时达到130min。

2.java代码请求带过滤器的scan接口

这是通过在scan中添加FirstKeyOnlyFilter类型的过滤器来实现的,本质就是每一行数据只扫描第一列,相比于第一种方法有两个优点:

1.自行编写扫表java代码工具,可以自定义一些想要扫描的rowkey范围、时间戳范围、指定列,较为灵活;

2.扫描传输数据量成倍减少,花费时间成倍减少。

扫描3000w行数据使用45min。

3.利用hbase.RowCounter包执行MR任务

RowCounter也就是 HBase 中的 org.apache.hadoop.hbase.mapreduce.RowCounter 这个类,这个类使用了MapReduce框架来实现并行计算表中的行数。

通过 $HBASE_HOME/bin/hbase 命令即可执行:

1
[root@cdh1 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter tableName

本质就是启动一个mapreduce任务分布式读取全表数据,map中就是一个简单的计数器,reduce就是将map输出结果进行累加。如果想要实现一些特殊的过滤计数,也可以自己实现一个类似的mapreduce任务,在map中添加相应的过滤逻辑。

这种MR的方式进行计数优点是非常快,扫描3000w行数据只需要100s;缺点是需要部署hadoop集群和mapreduce引擎,且任务执行时对线上表压力较大。

4.利用hbase协处理器

Coprocessor协处理器的本质就是利用分布式运算,减少客户端与服务器之前的数据传输。

HBase提供的org.apache.hadoop.hbase.coprocessor.AggregateImplementation协处理器中提供了一些基本简单运算接口,比如其中的rowCount()方法就是在每个rs上分别计算行数,然后将每个节点上的行数返回客户端累加即可得到总行数。

这种协处理器计数速度最快,扫描3000w行数据只需要23s;缺点是安装协处理器时,需要先将目标表置为disable状态,如果是线上表会影响查询,需要在建表时就装上,或者通过主备切换轮流安装。

参考文献:HBase统计表行数的四种方法

二、使用MR与过滤器scan实战

HBase封装好了一些通过MR扫描全表或者快照表的方法(Scan读取Hbase表与Hbase表快照),如使用TableMapReduceUtil.initTableMapperJob(),可以直接利用该方法结合scan,进行分布式scan。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package ads.filter;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import java.io.IOException;

public class RowCounterTest {
public RowCounterTest() {
}

public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();

configuration.set("bdp.hbase.accesskey","MZYH5UIKEY3BVC3SNDRYU7ZB3A");
configuration.set("bdp.hbase.instance.name","SL1000000002941");
configuration.set("hbase.client.retries.number","3");
configuration.set("hbase.rpc.timeout","3600000");
configuration.set("hbase.client.operation.timeout","3600000");

configuration.set("mapreduce.map.memory.mb","8192");
configuration.set("mapreduce.map.java.opts","-Xmx6144M");
configuration.set("mapreduce.map.cpu.vcores","2");
configuration.set("mapreduce.reduce.memory.mb","8192");
configuration.set("mapreduce.reduce.java.opts","-Xmx6144M");
configuration.set("mapreduce.reduce.cpu.vcores","4");
configuration.set("yarn.app.mapreduce.am.resource.mb","8192");
configuration.set("yarn.app.mapreduce.am.command-opts","-Xmx6144m");
configuration.set("yarn.app.mapreduce.am.resource.cpu-vcores","3");
configuration.set("mapreduce.task.io.sort.mb","1024");
configuration.set("mapreduce.job.reduce.slowstart.completedmaps","1");

Job job = createSubmittableJob(configuration, args);

Long startTime = System.currentTimeMillis();
job.waitForCompletion(true);
Long endTime = System.currentTimeMillis();

if (job.isSuccessful()){
long costTime = (endTime - startTime) / 1000;
System.out.println("MR任务执行时间:" + costTime + " s!");
System.out.println("计算行数:" + job.getCounters().findCounter(RowCounterTest.RowCounterMapper.Counters.ROWS).getValue() + " 行!");
}else {
System.out.println("行数计算MR任务失败!");
}
}

public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {

String tableName = args[0];
Job job = new Job(conf, "rowcounter_" + tableName);
job.setJarByClass(RowCounterTest.class);

Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);

//添加扫描的列簇和列范围限制。
String family = StringUtils.substringBefore(args[1], "_");
String qualifier = StringUtils.substringAfter(args[1], "_");
if ("null".equals(qualifier)) {
scan.addFamily(Bytes.toBytes(family));
}else {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
}

//设置扫描的时间戳范围限制。
long startTime = Long.parseLong(StringUtils.substringBefore(args[2], "_"));
long endTime = Long.parseLong(StringUtils.substringAfter(args[2], "_"));
scan.setTimeRange(startTime, endTime == 0L ? 9223372036854775807L : endTime);//9223372036854775807L为long类型最大值。

//这个过滤器返回它找到的行中的第一个cell,后续的其他列簇和列的cell就不用再扫描了,这么做是为了减少扫描时间。
scan.setFilter(new FirstKeyOnlyFilter());

job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterTest.RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(0);
return job;
}

static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> {
RowCounterMapper() {
}

public void map(ImmutableBytesWritable row, Result values, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context) throws IOException {
context.getCounter(RowCounterTest.RowCounterMapper.Counters.ROWS).increment(1L);
}

//在Mapper中创建一个内部静态枚举类,该枚举类可以被job的所有map任务获取到,实现所有map累加。
public static enum Counters {
ROWS;
}
}
}

三、使用协处理器实战

要将hbase表置于disable需要ddl权限,所以一般不会把协处理器的安装与使用放在一起,所以如下实战代码不能用于线上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package ads.coprocessor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;

public class AgreementTest {
public static void main(String[] args) {

Configuration configuration = HBaseConfiguration.create();

configuration.set("bdp.hbase.accesskey","MZYH5UIKEY3BVC3SNDRYU7ZB3A");
configuration.set("bdp.hbase.instance.name","SL1000000002941");
configuration.set("hbase.client.retries.number","3");
configuration.set("hbase.rpc.timeout","3600000");
configuration.set("hbase.client.operation.timeout","3600000");

TableName tableName = null;
Admin admin = null;
Connection connection = null;

try {
tableName = TableName.valueOf("hb_ibd:HB_IBD_PRO_ANALYSIS_FLOW_RECENT_NEW");
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();

admin.disableTable(tableName);
HTableDescriptor descriptor = admin.getTableDescriptor(tableName);
String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
if (! descriptor.hasCoprocessor(coprocessorClass)) {
descriptor.addCoprocessor(coprocessorClass);
}
admin.modifyTable(tableName, descriptor);
admin.enableTable(tableName);

Scan scan = new Scan();

AggregationClient ac = new AggregationClient(configuration);

long rowcount = ac.rowCount(tableName, new LongColumnInterpreter(), scan);

System.out.println("行数计算结果:" + rowcount);

} catch (Throwable e) {
e.printStackTrace();
}
}
}