HBase表数据倾斜治理_Region切分管理与并行读取单Region

HBase表数据倾斜治理_Region切分管理与并行读取单Region

1.Region切分管理

1.1 Region切分

1.1.1预分区

预分区就是在建表的时候预先设置拆分点,比如设置9个拆分点,那么建表时就会创建10个region,后序存入的数据与拆分点按字典序比较大小,决定存入哪个region。

拆分点的设置方式主要有四种:直接指定拆分点列表、HexStringSplit算法、UniformSplit算法、自定义算法。

HBase存储底层

hbase以二进制码流的形式存储数据,存储单元是byte,rowkey和value都是以byte[]的形式存储的。

hbase中rowkey的大小比较方式是按照字典序进行比较,其实本质就是byte[]按照字典序进行比较。

上图中第一幅图为按照UniformSplit算法建立的10个region分区信息,下图为表中数据示例。我们一定要牢记的一点就是hbase中保存的startkey、endkey、rowkey本质都是byte[],而我们在图一中看到的startkey、endkey和图二中看到的rowkey都是由byte[]解码得到的可供阅读的字符串。

byte[]与字符串之间的转换又叫编码和解码。编码和解码的算法有很多,常见的有ASCII编码、GBK编码、UTF-8编码、16进制字符串编码。那么很明显以上两幅图中使用的也是一种特殊的解码方式,可以看出这种解码方式将ASCII字符集中0-9、A-Z、a-z这几个字符对应的byte按照某种解码方式解码为ASCII字符集中的字符,而除此之外的其他byte按照16进制字符串进行解码后再加上\x标识。

HexStringSplit算法

HexStringSplit算法把字符串”00000000”到”FFFFFFFF”之间的字符串进行n等分之后得到分割点字符串进行编码,将这些分割点字符串编码后得到byte[]作为预分区拆分点。

注意HexStringSplit算法对”00000000”和”FFFFFFFF”这两个字符串进行n等分,这与对”00000000”和”FFFFFFFF”这两个字符串编码得到的byte[]进行n等分是完全不同的,因为”00000000”和”FFFFFFFF”编码之后它们之间会多出很多0-F对应的byte之外的byte,比如<、=、>等字符对应的byte。

由上述分析也可知,只有rowkey在编码前就是以十六进制字符串作为前缀的数据,才适合使用HexStringSplit算法来进行预分区建表。

UniformSplit算法

UniformSplit算法把new byte[0]和new byte[8]{\xFF, \xFF, \xFF, \xFF, \xFF, \xFF, \xFF, \xFF}之间byte[]进行n等分之后得到的分割点byte[]作为预分区拆分点。上图图一就有使用UniformSplit算法进行10等分得到的9个拆分点。

1.1.2自动分区

自动分区是指region在每次执行完memstore flush和compaction之后都会检查region存储是否达到split阈值,当达到split阈值时则会进行自动切分。进行split时并不是马上将region中的数据分成两份,而是新建两个region,这两个新region中存储着原region中hfile数据文件的引用。当新region中的引用文件被compaction的时候才会真正将原region中的数据复制到新region当中,而原region不再被引用时才可以被删除。

注意一点,如果当一个region当中存在引用文件时,该region无法被split。那么就存在一种特殊情况,当一个region中的数据快速增长,然后进行split,子region中的数据同样快速增长导致子region中的compaction始终在合并新的hfile而没有合并到应用文件,那么就会使得子region无法split,这样就会出现大于限定阈值的region。

hbase2支持7种region自动分区策略,目前默认使用的是SteppingSplitPolicy分区策略,也可以自行设置分区阈值。

1.1.3强制分区

hbase提供了强制分区的api,过程与自动分区类似,可以不穿分割点参数,则默认以region中最大store的midkey作为分割点,也自行指定一个分割点。

1.2 Region合并

当删除大量数据,每个region都变得较小时,可以使用hbase提供的region合并api来减少region数量,避免资源浪费。

1.3 Hfile合并

hbase每隔一段时间都要进行数据整理,清除过期数据、旧版本数据、被删除数据,减小文件数量,这么做可以提高查询效率,降低存储消耗,但是并不能提升写效率。

Minor Compaction

一次Minor Compaction默认会将store中的在规定大小范围内的10个年龄最大的hfile合并成一个hfile,会删除过期数据,但是不会删除旧版本数据和被删除数据。

hbase默认在每次memstore flush操作之后会检查是否满足Minor Compaction条件;且会通过CompactionChecker周期轮询线程每10000s检查一次是否满足Minor Compaction条件;hbase也提供了执行Minor Compaction操作的api,可以手动触发。

Major Compaction

一次Major Compaction会将store中的所有hfile合并成一个hfile,会删除过期数据、旧版本数据、被删除数据。

hbase默认7天执行一次Major Compaction,但是Major Compaction对整个hbase集群的压力非常大,一般建议设置hbase.hregion.majorcompaction=0关闭自动Major Compaction,通过api采取手动触发的方式定期在访问量较少时进行Major Compaction。

2.并行读取单Region

2.1默认情况

当我们使用MapReduce读取hbase表或者hbase快照时,使用TableInputFormat和TableSnapshotInputFormat作为输入数据的格式类,默认是一个region对应一个map任务。当单个region较大时,比如达到50G大概需要12个小时才能读取完成。

题外话,当我们使用MapReduce将数据bulkload到hbase表中时,默认是一个region对应一个reduce任务。

2.2 hbase.mapreduce.input.mappers.per.region

通过设置job任务的hbase.mapreduce.input.mappers.per.region参数可以实现多个map任务并行读取单个region上的数据,但是每个map任务读取的数据范围是使用类似于UniformSplit预分区算法得到的,本质就是把该region的startkey和endkey之间的所有byte[]进行n等分,然后每个map读取其中一个区间。

如果某个region内部的数据本身倾斜就非常严重,比如该region中大部分数据的rowkey就集中在上述分区中的一个区间中,那么这种单region的分片读取效果就非常差。

实现上述逻辑的hbase源代码在TableInputFormatBase类的getSplits()和createNInputSplitsUniform()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public List<InputSplit> getSplits(JobContext context) throws IOException {
boolean closeOnFinish = false;

// Just in case a subclass is relying on JobConfigurable magic.
if (table == null) {
initialize(context);
closeOnFinish = true;
}

// null check in case our child overrides getTable to not throw.
try {
if (getTable() == null) {
// initialize() must not have been implemented in the subclass.
throw new IOException(INITIALIZATION_ERROR);
}
} catch (IllegalStateException exception) {
throw new IOException(INITIALIZATION_ERROR, exception);
}
try {
List<InputSplit> splits = oneInputSplitPerRegion();

// set same number of mappers for each region
if (context.getConfiguration().get("hbase.mapreduce.input.mappers.per.region") != null) {
int nSplitsPerRegion = context.getConfiguration().getInt("hbase.mapreduce.input.mappers.per.region", 1);
List<InputSplit> res = new ArrayList<>();
for (int i = 0; i < splits.size(); i++) {
List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion);
res.addAll(tmp);
}
return res;
}

//The default value of "hbase.mapreduce.input.autobalance" is false.
if (context.getConfiguration().getBoolean("hbase.mapreduce.input.autobalance", false)) {
long maxAveRegionSize = context.getConfiguration()
.getLong("hbase.mapreduce.input.average.regionsize", 8L*1073741824); //8GB
return calculateAutoBalancedSplits(splits, maxAveRegionSize);
}

// return one mapper per region
return splits;
} catch (NamingException e) {
throw new IOException(e);
} finally {
if (closeOnFinish) {
closeTable();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
throws IllegalArgumentIOException {
if (split == null || !(split instanceof TableSplit)) {
throw new IllegalArgumentIOException(
"InputSplit for CreateNSplitsPerRegion can not be null + "
+ "and should be instance of TableSplit");
}
//if n < 1, then still continue using n = 1
n = n < 1 ? 1 : n;
List<InputSplit> res = new ArrayList<>(n);
if (n == 1) {
res.add(split);
return res;
}

// Collect Region related information
TableSplit ts = (TableSplit) split;
TableName tableName = ts.getTable();
String regionLocation = ts.getRegionLocation();
String encodedRegionName = ts.getEncodedRegionName();
long regionSize = ts.getLength();
byte[] startRow = ts.getStartRow();
byte[] endRow = ts.getEndRow();

// For special case: startRow or endRow is empty
if (startRow.length == 0 && endRow.length == 0){
startRow = new byte[1];
endRow = new byte[1];
startRow[0] = 0;
endRow[0] = -1;
}
if (startRow.length == 0 && endRow.length != 0){
startRow = new byte[1];
startRow[0] = 0;
}
if (startRow.length != 0 && endRow.length == 0){
endRow =new byte[startRow.length];
for (int k = 0; k < startRow.length; k++){
endRow[k] = -1;
}
}

// Split Region into n chunks evenly
byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
for (int i = 0; i < splitKeys.length - 1; i++) {
//notice that the regionSize parameter may be not very accurate
TableSplit tsplit =
new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
encodedRegionName, regionSize / n);
res.add(tsplit);
}
return res;
}

2.3 hbase.mapreduce.input.autobalance

默认情况下hbase.mapreduce.input.autobalance该参数为false,当设置job任务的该参数为true时,可以将较小的region合并使用一个map任务来读取,而较大的region拆分为多个map任务来读取。但是这里将region分片的措施与上一节相同,同样是按照byte[]进行等分,对于倾斜的region意义不大。

实现上述逻辑的hbase源代码在TableInputFormatBase类的calculateAutoBalancedSplits()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize)
throws IOException {
if (splits.size() == 0) {
return splits;
}
List<InputSplit> resultList = new ArrayList<>();
long totalRegionSize = 0;
for (int i = 0; i < splits.size(); i++) {
TableSplit ts = (TableSplit) splits.get(i);
totalRegionSize += ts.getLength();
}
long averageRegionSize = totalRegionSize / splits.size();
// totalRegionSize might be overflow, and the averageRegionSize must be positive.
if (averageRegionSize <= 0) {
LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " +
"set it to Long.MAX_VALUE " + splits.size());
averageRegionSize = Long.MAX_VALUE / splits.size();
}
//if averageRegionSize is too big, change it to default as 8 GB,
if (averageRegionSize > maxAverageRegionSize) {
averageRegionSize = maxAverageRegionSize;
}
// if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region
// set default as 64M = (default hdfs block size);
if (averageRegionSize < 64 * 1048576) {
return splits;
}
for (int i = 0; i < splits.size(); i++) {
TableSplit ts = (TableSplit) splits.get(i);
TableName tableName = ts.getTable();
String regionLocation = ts.getRegionLocation();
String encodedRegionName = ts.getEncodedRegionName();
long regionSize = ts.getLength();

if (regionSize >= averageRegionSize) {
// make this region as multiple MapReduce input split.
int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0);
List<InputSplit> temp = createNInputSplitsUniform(ts, n);
resultList.addAll(temp);
} else {
// if the total size of several small continuous regions less than the average region size,
// combine them into one MapReduce input split.
long totalSize = regionSize;
byte[] splitStartKey = ts.getStartRow();
byte[] splitEndKey = ts.getEndRow();
int j = i + 1;
while (j < splits.size()) {
TableSplit nextRegion = (TableSplit) splits.get(j);
long nextRegionSize = nextRegion.getLength();
if (totalSize + nextRegionSize <= averageRegionSize) {
totalSize = totalSize + nextRegionSize;
splitEndKey = nextRegion.getEndRow();
j++;
} else {
break;
}
}
i = j - 1;
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
encodedRegionName, totalSize);
resultList.add(t);
}
}
return resultList;
}

2.4 hbase.mapreduce.input.mappers.per.region+hbase.mapreduce.input.autobalance

当hbase.mapreduce.input.mappers.per.region使用默认值1时,calculateAutoBalancedSplits()方法中获取到的每个split的存储量大小就是对应的每个region的存储量大小,这个值是准确的,所以判断出来的过大和过小的split也是正确的。

但是当hbase.mapreduce.input.mappers.per.region参数>1时,使用createNInputSplitsUniform()方法进行过一次分片之后,每个split的存储量大小就不是真实值了,而是通过region存储量大小除以hbase.mapreduce.input.mappers.per.region得到的,实际上region中的数据是不可能完全均匀分布的。如果同时设置了hbase.mapreduce.input.autobalance为true,那么再进行均衡时对每个split的大小判断也就不再准确了。