HBase表数据倾斜治理_读取hbase表数据并bulkload到不同结构hbase表中

HBase表数据倾斜治理_读取hbase表数据并bulkload到不同结构hbase表中

HBase表数据倾斜治理(2)中我们实现了读取HBase表数据并保存成字符串文件,在HBase表数据倾斜治理(3)中我们实现了读取字符串文件并bulkload到HBase表,那么将两部分结合即可达到读取HBase表数据并重新组装rowkey再放回Hbase中的目的。

1.项目背景

以前的数据保存到HBase中的过程中,rowkey首个字段并没有进行reverse散列,导致数据倾斜,现要求将HBase表中数据取出,从rowkey中获取首个字段并进行reverse散列,再重新将数据bulkload到新表中。(此处以TableScanMR方式扫描数据作为范例,实际项目中使用SnapshotScanMR以获取更高效率。)

2.创建HBase表

创建一个新的HBase表用来存放进行治理之后的数据:create 'fellowreverse','d'

3.创建Mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class DataSkewCorrectMapper extends TableMapper<ImmutableBytesWritable, Put> {

//该Mapper所对应的每次map任务从hbase表中取出一行数据,并重新将改行数据封装成一个Put对象。
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {

byte[] rowkey = reverseFirstField(row.get(), 4);
ImmutableBytesWritable putRowkey = new ImmutableBytesWritable(rowkey);
Put put = new Put(rowkey);
for (Cell cell : value.listCells()) {
byte[] colFamily = new byte[cell.getFamilyLength()];
byte[] colQualifier = new byte[cell.getQualifierLength()];
byte[] colValue = new byte[cell.getValueLength()];
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), colFamily, 0, cell.getFamilyLength());
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), colQualifier, 0, cell.getQualifierLength());
System.arraycopy(cell.getValueArray(), cell.getValueOffset(), colValue, 0, cell.getValueLength());

put.add(colFamily, colQualifier, colValue);
}
context.write(putRowkey, put);
}

//对组成rowkey的首个字段进行reverse。
public byte[] reverseFirstField(byte[] rowkey, int firstFieldLenth) {
byte[] reverseField = new byte[firstFieldLenth];
System.arraycopy(rowkey, 0, reverseField, 0, firstFieldLenth);

if (firstFieldLenth == 4) {
int reverseInt = Integer.reverse(Bytes.toInt(reverseField));
reverseField = Bytes.toBytes(reverseInt);
}else {
long reverseLong = Long.reverse(Bytes.toLong(reverseField));
reverseField = Bytes.toBytes(reverseLong);
}

System.arraycopy(reverseField, 0, rowkey, 0, firstFieldLenth);
return rowkey;
}
}

4.创建Driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class DataSkewCorrector {
public static void main(String[] args) throws Exception {

try {

//这里创建HBase连接的目的是将数据放到目标表当中。
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Table destTable = conn.getTable(TableName.valueOf("fellowreverse"));
Admin admin = conn.getAdmin();

//设置被读取的Hbase表名
String hdfsRoot = "hdfs://localhost:9005/"; //hdfs 的根目录
String srcTablename = "fellowjava";

//设置相关读取配置
Job job = Job.getInstance(conf, "reverse table : " + srcTablename);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(srcTablename, scan, DataSkewCorrectMapper.class, ImmutableBytesWritable.class, Put.class, job, false);

//设置目标Hfile路径,并配置生成Hbase表元数据文件。
Path outputPath = new Path(hdfsRoot +"fellowreverse");
HFileOutputFormat2.setOutputPath(job, outputPath);
HFileOutputFormat2.configureIncrementalLoad(job, destTable, conn.getRegionLocator(TableName.valueOf("fellowreverse")));

//执行MR任务
job.waitForCompletion(true);

//在MR作业完成后,将Hbase表元数据文件推送给Hbase的RegionServers,这样就可以通过Hbase操作表数据。
if (job.isSuccessful()) {
System.out.println("reverse成功!");
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outputPath, admin, destTable, conn.getRegionLocator(TableName.valueOf("fellowreverse")));
}else {
System.out.println("reverse失败!");
}

} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}

}
}

5.治理前后对比

治理前在hbase shell当中查看表数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
hbase(main):003:0> scan 'fellowjava'
ROW COLUMN+CELL
,\x1D\xA0\x002022/4/4 column=d:Work, timestamp=1650883809066, value=\xE5\xAE\x9E\xE4\xB9\xA0
,\x1D\xA0\x002022/4/4 column=d:name, timestamp=1650883809066, value=zengqingfan
,\x1D\xA0\x002022/4/4 column=d:sex, timestamp=1650883809066, value=\xE7\x94\xB7
?\x03H@2022/3/18 column=d:Work, timestamp=1650883809066, value=\xE6\xAD\xA3\xE5\xBC\x8F
?\x03H@2022/3/18 column=d:name, timestamp=1650883809066, value=lijun
?\x03H@2022/3/18 column=d:sex, timestamp=1650883809066, value=\xE7\x94\xB7
|I\x00\x002022/4/11 column=d:Work, timestamp=1650883809066, value=\xE5\xAE\x9E\xE4\xB9\xA0
|I\x00\x002022/4/11 column=d:name, timestamp=1650883809066, value=lijiazheng
|I\x00\x002022/4/11 column=d:sex, timestamp=1650883809066, value=\xE7\x94\xB7
\xE3\xA9\x00\x002022/4/24 column=d:Work, timestamp=1650883809066, value=\xE6\xAD\xA3\xE5\xBC\x8F
\xE3\xA9\x00\x002022/4/24 column=d:name, timestamp=1650883809066, value=jiazhengyang
\xE3\xA9\x00\x002022/4/24 column=d:sex, timestamp=1650883809066, value=\xE7\x94\xB7
\xF1\xC6$\x002022/3/16 column=d:Work, timestamp=1650883809066, value=\xE6\xAD\xA3\xE5\xBC\x8F
\xF1\xC6$\x002022/3/16 column=d:name, timestamp=1650883809066, value=qiuyuchen
\xF1\xC6$\x002022/3/16 column=d:sex, timestamp=1650883809066, value=\xE7\x94\xB7
5 row(s) in 0.1130 seconds

治理后在hbase shell当中查看表数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
hbase(main):001:0> scan 'fellowreverse'
ROW COLUMN+CELL
\x00\x00\x92>2022/4/11 column=d:Work, timestamp=1651065122474, value=\xE5\xAE\x9E\xE4\xB9\xA0
\x00\x00\x92>2022/4/11 column=d:name, timestamp=1651065122474, value=lijiazheng
\x00\x00\x92>2022/4/11 column=d:sex, timestamp=1651065122474, value=\xE7\x94\xB7
\x00\x00\x95\xC72022/4/24 column=d:Work, timestamp=1651065122474, value=\xE6\xAD\xA3\xE5\xBC\x8F
\x00\x00\x95\xC72022/4/24 column=d:name, timestamp=1651065122474, value=jiazhengyang
\x00\x00\x95\xC72022/4/24 column=d:sex, timestamp=1651065122474, value=\xE7\x94\xB7
\x00\x05\xB842022/4/4 column=d:Work, timestamp=1651065122474, value=\xE5\xAE\x9E\xE4\xB9\xA0
\x00\x05\xB842022/4/4 column=d:name, timestamp=1651065122474, value=zengqingfan
\x00\x05\xB842022/4/4 column=d:sex, timestamp=1651065122474, value=\xE7\x94\xB7
\x00$c\x8F2022/3/16 column=d:Work, timestamp=1651065122474, value=\xE6\xAD\xA3\xE5\xBC\x8F
\x00$c\x8F2022/3/16 column=d:name, timestamp=1651065122474, value=qiuyuchen
\x00$c\x8F2022/3/16 column=d:sex, timestamp=1651065122474, value=\xE7\x94\xB7
\x02\x12\xC0\xFC2022/3/18 column=d:Work, timestamp=1651065122474, value=\xE6\xAD\xA3\xE5\xBC\x8F
\x02\x12\xC0\xFC2022/3/18 column=d:name, timestamp=1651065122474, value=lijun
\x02\x12\xC0\xFC2022/3/18 column=d:sex, timestamp=1651065122474, value=\xE7\x94\xB7
5 row(s) in 0.1880 seconds

可以看出rowkey的前四个字节变化了,而且HBase表重新排序了。