HBase表数据倾斜治理_HBase行数计算

HBase表数据倾斜治理_HBase行数计算

1.主备一致性验证

数据库最常见的主备一致性验证主要有两个方面:一个是数据准确性验证,这方面主要通过同一个查询语句得到的查询结果是否相同来验证;另一个是数据量的一致性验证,这方面主要通过行数计算来验证,比如全量数据条数、某个时间段内数据条数。如京东当前的clickhouse、hbase、durid数据库都是采用这种方式来验证主备一致性。

2.使用Filter来实现HBase行数计算

2.1基本原理

hbase行数计算的本质就是使用scan扫描全表数据来进行行数统计,那么hbase行数计算的优化本质就是提升scan效率。

2.2优化策略

本文采用了从3个方面来进行优化:

  • 通过Scan.addFamily()和scan.addColumn()来实现仅扫描单列簇或单列,减少扫描时间。
  • 通过Scan.setTimeRange()来实现扫描指定时间段内存入数据,可以实现不扫描全表,而仅计算某一天存入的数据条数。
  • 通过FirstKeyOnlyFilter过滤器来实现仅扫描同一个rowkey的第一个cell,减少扫描时间。

之前就学习和总结过scan的流程,如下图所示:

上述的列簇的扫描过滤是在步骤1当中,只会对指定的列簇生成StoreScanner进行数据检索;

时间戳扫描过滤是在步骤3和步骤6当中,步骤3当中是直接过滤掉根本没有该时间戳数据的文件,步骤6当中则是对正在扫描的cell进行时间戳比较过滤;

FirstKeyOnlyFilter过滤器过滤是在步骤7.2当中,完成了一个rowkey的一个cell扫描之后就直接准备下一行数据的检索,而不进行下一列数据的检索。

2.3 HBase时间戳与TTL

每个cell当中保存的版本号其实就是该cell插入时存入的时间戳timestamp字段,单位为毫秒。如果存入数据时添加了该字段值则为设置值,如果没有添加则默认该字段值为数据插入的当前时间戳。

比如一个cell数据实例:

rowkey=\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x15\xDF\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x07\xD2\x00\x00\x00\x00\x00\x1E\x8CT\x1C\xFC\xCF\x05, column=d:AvgDepthSingle , timestamp=1656880971313 , value= ?�

hbase有一个TTL(time to live)数据有效期标识,单位是毫秒。TTL是一个表级别的设置,也就是一整张表中所有数据的TTL都是相同的。

那么TTL生效的本质就是利用数据的时间戳与TTL进行比较,比如一张表的TTL为86400000,也就是一天,那么进行数据查询或者数据整理是,判断该数据能否被返回或者是否被清理的标准就是当前系统时间currentMilliseconds-timestamp>86400000,如果满足该条件则该cell数据不能被返回或被清理。

hbase当中的update操作本质就是添加一条相同rowkey而不同timestamp的数据;delete操作本质就是对小于当前currentMilliseconds的cell中timestamp最大的那个cell打上删除标识;scan操作本质就是获取小于当前currentMilliseconds的cell中timestamp最大的那个cell。也是就是说add、update、delete、scan操作实际上都是带时间戳的。

那么有一种情况就是在进行增删改查时自定义timestamp。尤其是在add时借助自定义timestamp可以实现个性化的TTL。但是要小心使用,如果添加数据时使用了自定义timestamp,那么在update、delete、scan时也要考虑是否使用自定义timestamp才能达到目的

2.4应用实例

2.4.1 MapReduce实现scan扫描与行数计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class RowCounterTest {
public RowCounterTest() {
}

public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();

configuration.set("bdp.hbase.erp","******");
configuration.set("bdp.hbase.accesskey","******");
configuration.set("bdp.hbase.instance.name","******");
configuration.set("hbase.zookeeper.quorum","******");
configuration.set("hbase.client.retries.number","3");
configuration.set("hbase.rpc.timeout","3600000");
configuration.set("hbase.client.operation.timeout","3600000");

configuration.set("mapreduce.map.memory.mb","8192");
configuration.set("mapreduce.map.java.opts","-Xmx6144M");
configuration.set("mapreduce.map.cpu.vcores","2");
configuration.set("mapreduce.reduce.memory.mb","8192");
configuration.set("mapreduce.reduce.java.opts","-Xmx6144M");
configuration.set("mapreduce.reduce.cpu.vcores","4");
configuration.set("yarn.app.mapreduce.am.resource.mb","8192");
configuration.set("yarn.app.mapreduce.am.command-opts","-Xmx6144m");
configuration.set("yarn.app.mapreduce.am.resource.cpu-vcores","3");
configuration.set("mapreduce.task.io.sort.mb","1024");
configuration.set("mapreduce.job.reduce.slowstart.completedmaps","1");

Job job = createSubmittableJob(configuration, args);

Long startTime = System.currentTimeMillis();
job.waitForCompletion(true);
Long endTime = System.currentTimeMillis();

if (job.isSuccessful()){
long costTime = (endTime - startTime) / 1000;
System.out.println("MR任务执行时间:" + costTime + " s!");
System.out.println("计算行数:" + job.getCounters().findCounter(RowCounterTest.RowCounterMapper.Counters.ROWS).getValue() + " 行!");
}else {
System.out.println("行数计算MR任务失败!");
}
}

public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {

String tableName = args[0];
Job job = new Job(conf, "rowcounter_" + tableName);
job.setJarByClass(RowCounterTest.class);

Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);

//添加扫描的列簇和列范围限制。
String family = StringUtils.substringBefore(args[1], "_");
String qualifier = StringUtils.substringAfter(args[1], "_");
if ("null".equals(qualifier)) {
scan.addFamily(Bytes.toBytes(family));
}else {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
}

//设置扫描的时间戳范围限制。
long startTime = Long.parseLong(StringUtils.substringBefore(args[2], "_"));
long endTime = Long.parseLong(StringUtils.substringAfter(args[2], "_"));
scan.setTimeRange(startTime, endTime == 0L ? 9223372036854775807L : endTime);//9223372036854775807L为long类型最大值。

//这个过滤器返回它找到的行中的第一个cell,后续的其他列簇和列的cell就不用再扫描了,这么做是为了减少扫描时间。
scan.setFilter(new FirstKeyOnlyFilter());

job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterTest.RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(0);
return job;
}

static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> {
RowCounterMapper() {
}

public void map(ImmutableBytesWritable row, Result values, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context) throws IOException {
context.getCounter(RowCounterTest.RowCounterMapper.Counters.ROWS).increment(1L);
}

//在Mapper中创建一个内部静态枚举类,该枚举类可以被job的所有map任务获取到,实现所有map累加。
public static enum Counters {
ROWS;
}
}
}
  • MapReduce一般都是通过在Mapper类中创建内部枚举类来实现计数,具体实现方式见上述代码。
2.4.2 命令行语句

扫描列簇指定d,扫描时间范围指定为全表:

1
hadoop jar RowCounterTest-1.0-SNAPSHOT.jar hb_ibd:HB_IBD_UNBOUNDER_TRADE_BRAND d_null 0_0

扫描列簇指定d,扫描时间范围指定为2022-07-09全天:

1
hadoop jar RowCounterTest-1.0-SNAPSHOT.jar hb_ibd:HB_IBD_UNBOUNDER_TRADE_BRAND d_null 1657296001000_1657382399000

全表行数计算结果日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[2022-07-10T16:32:35.688+08:00] [INFO] hadoop.mapreduce.Job.monitorAndPrintJob(Job.java 1417) [main] : Job job_8766029101801_70452735 completed successfully
[2022-07-10T16:32:35.771+08:00] [INFO] hadoop.mapreduce.Job.monitorAndPrintJob(Job.java 1424) [main] : Counters: 35
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=355679
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=101
HDFS: Number of bytes written=0
HDFS: Number of read operations=1
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=2372166
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=395361
Total vcore-seconds taken by all map tasks=790722
Total megabyte-seconds taken by all map tasks=2429097984
Total gcore-seconds taken by all map tasks=0
Map-Reduce Framework
Map input records=59999731
Map output records=0
Input split bytes=101
Reduce shuffle bytes=0
Reduce input records=0
Reduce output records=0
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=3733
CPU time spent (ms)=88380
Physical memory (bytes) snapshot=1659187200
Virtual memory (bytes) snapshot=9006567424
Total committed heap usage (bytes)=2906652672
com.jd.ads.RowCounterTest$RowCounterMapper$Counters
ROWS=59999731
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
MR任务执行时间:415 s!
计算行数:59999731 行!

2022-07-09全天行数计算结果日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[2022-07-10T16:28:19.261+08:00] [INFO] hadoop.mapreduce.Job.monitorAndPrintJob(Job.java 1417) [main] : Job job_8766029101801_70452892 completed successfully
[2022-07-10T16:28:19.345+08:00] [INFO] hadoop.mapreduce.Job.monitorAndPrintJob(Job.java 1424) [main] : Counters: 35
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=355771
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=101
HDFS: Number of bytes written=0
HDFS: Number of read operations=1
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=110814
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=18469
Total vcore-seconds taken by all map tasks=36938
Total megabyte-seconds taken by all map tasks=113473536
Total gcore-seconds taken by all map tasks=0
Map-Reduce Framework
Map input records=55895
Map output records=0
Input split bytes=101
Reduce shuffle bytes=0
Reduce input records=0
Reduce output records=0
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=839
CPU time spent (ms)=7920
Physical memory (bytes) snapshot=647380992
Virtual memory (bytes) snapshot=8851468288
Total committed heap usage (bytes)=2630877184
com.jd.ads.RowCounterTest$RowCounterMapper$Counters
ROWS=55895
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
MR任务执行时间:41 s!
计算行数:55895 行!

3.使用Coprocessor来实现HBase行数计算

相比于使用过滤器来进行scan提效,实际上使用协处理器来进行scan提效的效果要更好。

hbase本身已经支持了包含相关简单聚合函数的协处理器org.apache.hadoop.hbase.coprocessor.AggregateImplementation,一般来说直接在代码中调用相关hbase api对表进行disable,然后注册该coprocessor再enable即可。但是京东的hbase管理部门对表的disable操作进行了限制,一般用户无权限对线上表进行该操作,也就无法自行对表进行协处理器注册操作。

应用实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class AgreementTest {
public static void main(String[] args) {
Configuration configuration = HBaseConfiguration.create();

configuration.set("bdp.hbase.erp","******");
configuration.set("bdp.hbase.accesskey","******");
configuration.set("bdp.hbase.instance.name","******");
configuration.set("hbase.zookeeper.quorum","******");
configuration.set("hbase.client.retries.number","3");
configuration.set("hbase.rpc.timeout","3600000");
configuration.set("hbase.client.operation.timeout","3600000");

configuration.set("mapreduce.map.memory.mb","8192");
configuration.set("mapreduce.map.java.opts","-Xmx6144M");
configuration.set("mapreduce.map.cpu.vcores","2");
configuration.set("mapreduce.reduce.memory.mb","8192");
configuration.set("mapreduce.reduce.java.opts","-Xmx6144M");
configuration.set("mapreduce.reduce.cpu.vcores","4");
configuration.set("yarn.app.mapreduce.am.resource.mb","8192");
configuration.set("yarn.app.mapreduce.am.command-opts","-Xmx6144m");
configuration.set("yarn.app.mapreduce.am.resource.cpu-vcores","3");
configuration.set("mapreduce.task.io.sort.mb","1024");
configuration.set("mapreduce.job.reduce.slowstart.completedmaps","1");

TableName tableName = null;
Admin admin = null;
Connection connection = null;

try {
tableName = TableName.valueOf("hb_ibd:HB_IBD_UNBOUNDER_TRADE_BRAND");
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();

admin.disableTable(tableName);
HTableDescriptor descriptor = admin.getTableDescriptor(tableName);
String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
if (! descriptor.hasCoprocessor(coprocessorClass)) {
descriptor.addCoprocessor(coprocessorClass);
}
admin.modifyTable(tableName, descriptor);
admin.enableTable(tableName);

Scan scan = new Scan();

AggregationClient ac = new AggregationClient(configuration);
long rowcount = ac.rowCount(tableName, new LongColumnInterpreter(), scan);
System.out.println("行数计算结果:" + rowcount);

} catch (Throwable e) {
e.printStackTrace();
}
}
}

命令行语句:

1
hadoop jar AgreementTest-1.0-SNAPSHOT.jar

全表行数计算无权限报错日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[2022-08-07T17:16:50.653+08:00] [INFO] hbase.client.HBaseAdmin.call(HBaseAdmin.java 1268) [main] : Started disable of hb_ibd:HB_IBD_PRO_ANALYSIS_FLOW_RECENT_NEW
[2022-08-07T17:16:50.776+08:00] [INFO] hbase.client.HBaseAdmin.call(HBaseAdmin.java 1268) [main] : Started disable of hb_ibd:HB_IBD_PRO_ANALYSIS_FLOW_RECENT_NEW
[2022-08-07T17:16:50.984+08:00] [INFO] hbase.client.HBaseAdmin.call(HBaseAdmin.java 1268) [main] : Started disable of hb_ibd:HB_IBD_PRO_ANALYSIS_FLOW_RECENT_NEW
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=3, exceptions:
Sun Aug 07 17:16:50 CST 2022, RpcRetryingCaller{globalStartTime=1659863810545, pause=100, retries=3, quotaRetries=63}, org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.security.AccessDeniedException): org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for you to disable table operation,hbase admin is needed.
at com.jd.hbase.coprocessor.auth.AclMasterObserver.checkDDLPermission(AclMasterObserver.java:260)
at com.jd.hbase.coprocessor.auth.AclMasterObserver.preDisableTable(AclMasterObserver.java:180)
at org.apache.hadoop.hbase.master.MasterCoprocessorHost$43.call(MasterCoprocessorHost.java:550)
at org.apache.hadoop.hbase.master.MasterCoprocessorHost.execOperation(MasterCoprocessorHost.java:1279)
at org.apache.hadoop.hbase.master.MasterCoprocessorHost.preDisableTable(MasterCoprocessorHost.java:546)
at org.apache.hadoop.hbase.master.HMaster.disableTable(HMaster.java:2090)
at org.apache.hadoop.hbase.master.MasterRpcServices.disableTable(MasterRpcServices.java:590)
at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java:59511)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2147)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:107)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:263)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:243)