HBase表数据倾斜治理_读取export snapshot并bulkload到不同结构hbase表中

HBase表数据倾斜治理_读取export snapshot并bulkload到不同结构hbase表中

1.SnapshotScanMR方法读取hbase快照的两种使用场景

前文我们提到snapshot的主要作用就是帮助我们在扫描HBase表数据时绕过RegionServer直接从HDFS中读取Hfile文件数据,减轻RegionServer的压力。

实际上SnapshotScanMR可以读取的快照有两种:

  • 读取本地hbase集群上的快照。
  • 读取从其他hbase集群上export到本地的快照。

1.1读取本地快照

hbase库中存储的表快照,也就是在hbase shell中通过命令list_snapshots可以查到的快照表,它们的快照文件是存储在hbase默认文件夹中的。

可以在hbase-site.xml文件中如下配置hbase默认文件夹:

1
2
3
4
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9005/hbase</value>
</property>

程序在创建HBaseConfiguration对象时自动加载该参数,initTableSnapshotMapperJob方法会到该目录下去寻找快照文件。

1.2读取export快照

从其他hbase集群中使用export导出的快照时,hbase客户端会把快照文件指向的数据文件也一并导出。使用initTableSnapshotMapperJob方法同样可以直接读取这种快照文件,不过必须在代码中将hbase.rootdir设置为快照文件的存储路径,或者将快照文件和数据文件移动到hbase默认文件夹下。

tips:将export导出的快照文件和数据文件放到hbase默认文件夹下,如果版本等信息一致,可以使用命令list_snapshots查到该快照表。

2.应用实例

2.1export快照文件

快照文件具体结构会在后续博文中进行介绍。

1
E:\HBase\hbase\bin>hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot "fellowshell_snapshot" -copy-to hdfs://localhost:9005/exportSnapshot

语法说明:

1
hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot 快照名 -copy-to 目标文件夹

export日志如下,可以看出导出过程实际上也是一个MR任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/HBase/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/HBase/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-05-09 01:20:07,557 INFO [main] snapshot.ExportSnapshot: Copy Snapshot Manifest
2022-05-09 01:20:07,864 INFO [main] client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2022-05-09 01:20:08,782 INFO [main] snapshot.ExportSnapshot: Loading Snapshot 'fellowshell_snapshot' hfile list
2022-05-09 01:20:08,999 INFO [main] mapreduce.JobSubmitter: number of splits:1
2022-05-09 01:20:09,078 INFO [main] mapreduce.JobSubmitter: Submitting tokens for job: job_1652029268976_0001
2022-05-09 01:20:09,424 INFO [main] impl.YarnClientImpl: Submitted application application_1652029268976_0001
2022-05-09 01:20:09,450 INFO [main] mapreduce.Job: The url to track the job: http://LAPTOP-1D612HIP:8088/proxy/application_1652029268976_0001/
2022-05-09 01:20:09,451 INFO [main] mapreduce.Job: Running job: job_1652029268976_0001
2022-05-09 01:20:19,638 INFO [main] mapreduce.Job: Job job_1652029268976_0001 running in uber mode : false
2022-05-09 01:20:19,644 INFO [main] mapreduce.Job: map 0% reduce 0%
2022-05-09 01:20:24,732 INFO [main] mapreduce.Job: map 100% reduce 0%
2022-05-09 01:20:24,739 INFO [main] mapreduce.Job: Job job_1652029268976_0001 completed successfully
2022-05-09 01:20:24,803 INFO [main] mapreduce.Job: Counters: 37
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=141262
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=6092
HDFS: Number of bytes written=5885
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=2742
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=2742
Total vcore-seconds taken by all map tasks=2742
Total megabyte-seconds taken by all map tasks=2807808
Map-Reduce Framework
Map input records=1
Map output records=0
Input split bytes=207
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=34
CPU time spent (ms)=874
Physical memory (bytes) snapshot=341762048
Virtual memory (bytes) snapshot=361213952
Total committed heap usage (bytes)=196083712
org.apache.hadoop.hbase.snapshot.ExportSnapshot$Counter
BYTES_COPIED=5885
BYTES_EXPECTED=5885
BYTES_SKIPPED=0
COPY_FAILED=0
FILES_COPIED=1
FILES_SKIPPED=0
MISSING_FILES=0
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
2022-05-09 01:20:24,806 INFO [main] snapshot.ExportSnapshot: Finalize the Snapshot Export
2022-05-09 01:20:24,815 INFO [main] snapshot.ExportSnapshot: Verify snapshot integrity
2022-05-09 01:20:24,845 INFO [main] snapshot.ExportSnapshot: Export Completed: fellowshell_snapshot

2.2在新hbase集群中建表

1
hbase(main):017:0> create "exportSnapshotReverse","d"

2.3创建mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ReverseMapper extends TableMapper<ImmutableBytesWritable, Put> {

public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {

byte[] rowkey = reverseFirstField(row.get(), 4);
ImmutableBytesWritable putRowkey = new ImmutableBytesWritable(rowkey);
Put put = new Put(rowkey);
for (Cell cell : value.listCells()) {
byte[] colFamily = new byte[cell.getFamilyLength()];
byte[] colQualifier = new byte[cell.getQualifierLength()];
byte[] colValue = new byte[cell.getValueLength()];
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), colFamily, 0, cell.getFamilyLength());
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), colQualifier, 0, cell.getQualifierLength());
System.arraycopy(cell.getValueArray(), cell.getValueOffset(), colValue, 0, cell.getValueLength());

put.add(colFamily, colQualifier, colValue);
}
context.write(putRowkey, put);
}

public byte[] reverseFirstField(byte[] rowkey, int firstFieldLenth) {
byte[] reverseField = new byte[firstFieldLenth];
System.arraycopy(rowkey, 0, reverseField, 0, firstFieldLenth);

if (firstFieldLenth == 4) {
int reverseInt = Integer.reverse(Bytes.toInt(reverseField));
reverseField = Bytes.toBytes(reverseInt);
}else {
long reverseLong = Long.reverse(Bytes.toLong(reverseField));
reverseField = Bytes.toBytes(reverseLong);
}

System.arraycopy(reverseField, 0, rowkey, 0, firstFieldLenth);
return rowkey;
}
}

2.4创建driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ExportReverseDriver {
public static void main(String[] args) throws Exception {

String hdfsRoot = "hdfs://localhost:9005/"; //hdfs 的根目录
String snapshotName = "fellowshell_snapshot";
String destTableName = "exportSnapshotReverse";
String tmpDir = "testpath";

Configuration conf = HBaseConfiguration.create();

/**重要:因为我自己电脑的缘故,导致zookeeper的默认端口2181被占用,启动hbase时会自动将zookeeper启动到2182端口上,
那么就需要我在hbase的hbase-site.xml文件中配置hbase去连接zookeeper的2182端口,
同时也需要我在java程序中配置hbase.zookeeper.property.clientPort为2182。*/
conf.set("hbase.zookeeper.property.clientPort", "2182");

//重要:将hbase.rootdir设置为存储export快照文件的文件夹。
//此程序中除了获取快照文件,后续不再使用到hbase.rootdir参数,所以该设置并不会影响hbase系统的文件目录结构,如果后续还需要使用则需要注意。
conf.set("hbase.rootdir", "hdfs://localhost:9005/exportSnapshot");

//重要:initTableSnapshotMapperJob方法会在添加的tmp目录下创建临时文件夹,用来存放在hbase客户端本地临时创建的region,
//该方法先根据快照文件和数据文件在该文件夹中重建快照表的region,然后scan再扫描这些region获取数据。
//创建该文件夹时如果输入参数没有指定文件系统,则文件夹目录为fs.defaultFS+tmp,如果指定了文件系统则就是tmp,
//扫描数据时则是从fs.defaultFS目录往下去寻找region。所以在设置该tmp参数时,要么不要指定文件系统,要么就指定文件系统为fs.defaultFS。
conf.set("fs.defaultFS", hdfsRoot);

Connection conn = ConnectionFactory.createConnection(conf);
Table destTable = conn.getTable(TableName.valueOf(destTableName));
Admin admin = conn.getAdmin();

Job job = Job.getInstance(conf, "reverse table : " + snapshotName);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, ReverseMapper.class, ImmutableBytesWritable.class, Put.class, job, false, new Path(tmpDir));

Path outputPath = new Path(hdfsRoot + destTableName);
HFileOutputFormat2.setOutputPath(job, outputPath);
HFileOutputFormat2.configureIncrementalLoad(job, destTable, conn.getRegionLocator(TableName.valueOf(destTableName)));

job.waitForCompletion(true);

if (job.isSuccessful()) {
System.out.println("reverse成功!");
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outputPath, admin, destTable, conn.getRegionLocator(TableName.valueOf(destTableName)));
}else {
System.out.println("reverse失败!");
}
}
}

2.5重点回顾

2.5.1 hbase.rootdir

通过conf.set("hbase.rootdir", "hdfs://localhost:9005/exportSnapshot")将hbase.rootdir设置为存储export快照文件的文件夹。

但是要注意此程序后续还需不需要使用该参数,如果还需要使用则需注意避免影响hbase系统的文件目录结构。

2.5.2 initTableSnapshotMapperJob方法tmp参数

initTableSnapshotMapperJob方法会在添加的tmp目录下创建临时文件夹,用来存放在hbase客户端本地临时创建的region,该方法先根据快照文件和数据文件在该文件夹中重建快照表的region,然后再scan扫描这些region获取数据。

创建tmp文件夹时:

  • 如果输入参数tmp没有指定文件系统,如/testpath,则tmp文件夹目录为fs.defaultFS+“/testpath”;
  • 如果指定了文件系统,如hdfs://localhost:9005/testpath,则tmp文件加就是该参数值。

扫描读取数据时:

  • 从fs.defaultFS目录往下去寻找该快照表的region。

如果我们没有在core-site.xml文件中设置所以在配置fs.defaultFS,则默认fs.defaultFS为本地文件系统file:///,同时我们也可以在代码中通过conf.set("fs.defaultFS", hdfsRoot)来重新配置fs.defaultFS。

总之无论如何配置fs.defaultFS和tmp参数时,一定要保证创建tmp文件夹和扫描读取tmp文件夹的路径是在同一个文件系统中。