HBase_HBase倾斜治理专利与实战代码

HBase_HBase倾斜治理专利与实战代码

一、专利撰写

一种多维度组合rowkey的HBase表数据倾斜治理方法

技术领域

本发明属于HBase数据库技术领域,具体涉及一种针对由聚集维度组成rowkey的HBase表的数据倾斜治理方法。

背景技术

HBase是一种非关系型数据库,适合于存储非结构化数据,具有高可靠性、高性能、面向列、可伸缩、分布式等特点。HBase数据库构建于Hadoop分布式文件系统之上,通过底层HDFS的副本机制实现数据的冗余存储,在物理存储上以列簇的形式实现KV数据的存储,具有极其出色的读写性能。

数据应用平台为了使用更加灵活的聚合维度组合方式进行批量数据查询,使用不同聚合维度范围从大到小的拼接的方式组成rowkey,在此基础之上使用rowkey前缀查询即可实现查询不同维度的批量数据。比如一张HBase表中存储了SKU商品数据,使用商品的一级类目id+二级类目id+三级类目id作为rowkey,在进行数据查询时,只输入一级类目id维度即可高效快速地查询出该一级类目id下的所有SKU商品数据,输入一级类目id+二级类目id维度组合即可高效快速地查询出该二级类目组合下的所有SKU商品数据,输入一级类目id+二级类目id+三级类目id维度组合即可高效快速地查询出该三级类目组合下的所有SKU商品数据。

数据应用平台使用上述灵活的聚合维度组合拼接rowkey,配合rowkey前缀查询可以高效定位目标数据,避免了将HBase全表数据全量读取到内存再进行过滤,实现了高效查询、资源占用少、响应速度快等优点。但是这种携带业务逻辑的rowkey设计则会导致rowkey的不均匀分布,无论是采用HexStringSplit还是UniformSplit等预分区算法,当组成rowkey的首个业务维度值分布较为集中时,一定会造成HBase表的分区倾斜,大量数据集中在单个RegionServer上,导致单个节点存储和查询压力过大,夜间生产推数时效延迟。

鉴于上述HBase数据库存储现状,有必要针对数据倾斜和热点问题加以治理,因此设计一种针对由业务维度组成rowkey的HBase表数据倾斜治理方法,显得十分迫切和重要。

发明内容

本发明的目的是克服现有技术的不足,提供一种针对由业务维度组成rowkey的HBase表数据倾斜治理方法,该方法能够解决HBase表的数据倾斜和热点问题,避免HBase集群出现单个节点存储和访问压力过大。

为了克服现有技术问题,本发明是通过以下技术方案实现的:

1)一种针对由业务维度组成rowkey的HBase表数据倾斜治理方法,特征在于利用Export工具扫描所述原数据倾斜HBase表生成快照,并将快照文件转存在可读取的HDFS分布式文件系统目录上。

2)按上述技术方案,所述倾斜治理工具主要包含快照读取模块与数据重推模块,HBase倾斜数据读取与治理后数据重写分两步独立进行。

3)按上述技术方案,所述快照读取模块采用MapReduce计算模型读取原数据倾斜HBase表快照,采用字节码预分区算法对单个region的rowkey范围进行均等拆分,实现多个map并行读取单个region。

4)按上述技术方案,所述快照读取模块按照业务字段数据类型元数据对表中字节码数据进行反序列化解码,对不同数据类型的数据使用不同的解码算法,得到具有完整业务含义的原始数据。

5)按上述技术方案,所述快照读取模块使用水平制表符作为业务原始数据行的列分隔符,以字符串的心事保存到Text中间存储文件中。

6)按上述技术方案,所述快照读取模块以Bzip2压缩格式对Text中间存储文件进行压缩。

7)按上述技术方案,所述快照读取模块在倾斜数据读取解析过程中统计元HBase表数据总行数。

8)按上述技术方案,所述数据重推模块采用MapReduce计算模型读取Bzip2压缩格式的Text中间存储文件,对单个文件按照数据量均分为多份,使用多个map并发读取。

9)按上述技术方案,所述数据重推模块对组成rowkey的首个业务维度字段原始数据进行反转散列。

10)按上述技术方案,所述数据重推模块将反转散列后的rowkey与其他原始业务字段数据进行编码后存储到hfile中,以BulkLoad形式将hfile直接推送到HBase目录中。

11)按上述技术方案,所述数据重推模块在数据重推过程中统计治理后数据总行数,并随机挑选一个rowkey获取行数据,验证倾斜治理前后数据总量一致性和正确性。

本发明提供的HBase表数据倾斜治理方法与现有技术相比,具有以下有益技术效果:将原数据倾斜HBase表直接底层hfile层面生成快照保存到HDFS目录上,再使用快照读取模块读取倾斜数据,避免直接全表扫描线上HBase集群,造成RegionServer压力过大;采用字节码预分区算法对每个region的rowkey范围进行预拆分,实现多个map并行读取单个region,成倍缩减了快照读取时间;按照HBase原数据字段类型将表中字节码数据进行反序列化,保证数据正确性,避免编码方式错误造成乱码导致数据失真;Bzip2压缩格式存储Text中间文件,占用HDFS存储资源缩减至十分之一;将数据推入HBase新表时,将组成rowkey的首个业务字段进行反转再拼接rowkey,将不同业务字段值尽可能散列开来,避免聚集,同时还能实现灵活的业务维度组合精确快速查询;比对倾斜治理前后HBase表中数据总行数与随机数据行值,确保了倾斜治理前后数据总量一致性与数据正确性。

附图说明

图1为本发明的整体流程示意图。

图2为本发明所述的一个HBase表倾斜治理前region数据分布图。

图3为本发明所述的一个HBase表倾斜治理后region数据分布图。

图1:

图2:

图3:

二、实战代码

1.HBaseSnapshotRead快照读取模块

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
public class GetHbaseTool extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(GetHbaseTool.class);
private static OptionsProcessor optionsProcessor = null;

public static void main(String[] args) throws Exception {
int res = 1;
config(args);
System.out.println("get hbase !");
res = ToolRunner.run(new Configuration(), new GetHbaseTool(), args);
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {

String hdfsUrl = optionsProcessor.getHdfsUrl();
//String hdfsUrl = "hdfs://ns22017"

JobConf conf = new JobConf();
logger.warn("fs.defaultFS:" + hdfsUrl);
conf.set("fs.defaultFS", hdfsUrl);
conf.setInt("mapreduce.map.cpu.vcores", 2);
conf.set("mapreduce.map.memory.mb", "4096");
conf.set("mapreduce.map.java.opts", "-Xmx3g");
conf.set("mapreduce.task.timeout", "1800000");

Scan scan = new Scan();
scan.setCaching(Integer.parseInt("5000"));
scan.setCacheBlocks(false);

String startRow = "";
String stopRow = "";
if (StringUtils.isNotEmpty(startRow)) {
scan.setStartRow(startRow.getBytes());
}
if (StringUtils.isNotEmpty(stopRow)) {
scan.setStopRow(stopRow.getBytes());
}

String hbaseSplit = optionsProcessor.getSplitAlgorithm();
//String hbaseSplit = "UniformSplit";
String snapshotName = optionsProcessor.getSnapshotName();
//String snapshotName = "sanpshot_HB_IBD_SHOP_DIAGNOSIS_INDICATOR_MARKETINGTOOLS";
Integer numSplitsPerRegion = optionsProcessor.getNumSplitsPerRegion();
//Integer numSplitsPerRegion = "3";
String hbaseRootDir = optionsProcessor.getHbaseRootDir();
//String hbaseRootDir = "hdfs://ns22017/user/ads_sz_load/hfile_convert/origin";
String outputPath = optionsProcessor.getOutputPath();
//String outputPath = "/user/ads_sz_load/hbasetohive/result/";
String tmpPath = optionsProcessor.getTmpPath();
//String tmpPath = "/user/ads_sz_load/hbasetohive/tmp/snapshot"
String firstRowKeyType = optionsProcessor.getFirstRowKeyType();
//String firstRowKeyType = "long";
String compressionAlgorithm = optionsProcessor.getCompressionAlgorithm();
//String firstRowKeyType = "null";

conf.setFloat("hbase.tablesnapshotinputformat.locality.cutoff.multiplier", Float.parseFloat("0.8f"));
conf.setInt("hbase.snapshot.thread.pool.max", 8);
conf.setInt("hbase.snapshot.thread.pool", 8);
conf.set("hbase.rootdir", hbaseRootDir);
conf.set("first.rowkey.type", firstRowKeyType);


String outputStr = hdfsUrl + outputPath;
String tmpStr = hdfsUrl + tmpPath;
Path outPath = new Path(outputStr);
Path tmp = new Path(tmpStr);
logger.info("outPath:" + outputStr);
logger.info("tmpPath:" + tmpStr);

try {
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
fileSystem.close();
} catch (IOException e1) {
e1.printStackTrace();
}

Job job = Job.getInstance(conf, "GetHbaseTool");
job.setJarByClass(GetHbaseTool.class);
RegionSplitter.SplitAlgorithm split = null;
if ("HexStringSplit".equals(hbaseSplit)) {
split = new RegionHexStringExtendSplitter();
} else if ("UniformSplit".equals(hbaseSplit)) {
split = new RegionSplitter.UniformSplit();
}
logger.info("SplitAlgorithm:" + split.getClass().getName());
job.setNumReduceTasks(0);
logger.info("reduce size:" + job.getNumReduceTasks());
long start_time = System.currentTimeMillis();

Class<? extends TableMapper> mapper;
Class<?> outputKeyClass;
Class<?> outputValueClass;

mapper = ReverseRowkeyMapper.class;
outputKeyClass = Text.class;
outputValueClass = Text.class;
logger.info("format:" + mapper.getSimpleName());
FileOutputFormat.setOutputPath(job, outPath);

//设置输出文件压缩
if ("Gzip".equals(compressionAlgorithm)) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}else if ("Bzip2".equals(compressionAlgorithm)) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
}else if ("ZSTD".equals(compressionAlgorithm)) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, ZStandardCodec.class);
}

ParallelizeTableMapReduceUtil.initTableSnapshotMapperJob(
snapshotName, // The name of the snapshot (of a table) to read from
scan, // Scan instance to control CF and attribute selection
mapper, // mapper
outputKeyClass, // mapper output key
outputValueClass, // mapper output value.
job, // The current job to adjust
false, // upload HBase jars and jars for any of the configured job classes via the distributed cache (tmpjars)
tmp, // a temporary directory to copy the snapshot files into
split, // splitAlgo algorithm to split, current split algorithms support RegionSplitter.UniformSplit() and RegionSplitter.HexStringSplit()
numSplitsPerRegion // how many input splits to generate per one region
);

boolean b = job.waitForCompletion(true);
long end_time = System.currentTimeMillis();
logger.info("start:" + start_time);
logger.info("end:" + end_time);
logger.info("spent time:" + (end_time - start_time) / 1000);
return b ? 0 : 1;
}

private static void config(String[] args) {
optionsProcessor = OptionsProcessor.getOptionsProcessor();
optionsProcessor.processOptions(args);
}
}

数据读取与转换mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ReverseRowkeyMapper extends TableMapper<Text, Text> {
private String firstRowkeyType;
private Text writeKey = new Text();

@Override
protected void setup(Context context) {
Configuration configuration = context.getConfiguration();
firstRowkeyType = configuration.get("first.rowkey.type");
}

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

if (null != value) {

byte[] rowkey = reverseFirstField(key.get(), firstRowkeyType);
String res = "";
for (Cell cell : value.listCells()) {
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
//String colValue = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()).replaceAll("\\n|\\r|\\t|\1|\\\\N", " ");//hbase工具类解码
byte[] colValue = new byte[cell.getValueLength()];
System.arraycopy(cell.getValueArray(), cell.getValueOffset(), colValue, 0, cell.getValueLength());

res += colName + ":";
res += StringHexUtils.bytesToHexString(colValue) + "\t";//16进制字符串格式解码
}

//writeKey.set(new Text(Bytes.toString(rowkey)));//hbase工具类解码
writeKey.set(new Text(StringHexUtils.bytesToHexString(rowkey)));//16进制字符串格式解码

//mapper输出中key和value之间的默认分隔符就是"\t"
context.write(writeKey, new Text(res));
}
}

//对组成rowkey的首个字段进行reverse。
public byte[] reverseFirstField(byte[] rowkey, String firstRowkeyType) {
int firstFieldLenth = 0;
if(firstRowkeyType.toUpperCase().equals("STRING")){ //string方式需要单独解析 暂时不实现
return rowkey;
}else if(firstRowkeyType.toUpperCase().equals("INT")){ //暂时只处理int和long型
firstFieldLenth =4;
}else if(firstRowkeyType.toUpperCase().equals("LONG")){
firstFieldLenth =8;
}else {
return rowkey;
}

byte[] reverseField = new byte[firstFieldLenth];
System.arraycopy(rowkey, 0, reverseField, 0, firstFieldLenth);

if (firstFieldLenth == 4) {
int reverseInt = Integer.reverse(Bytes.toInt(reverseField));
reverseField = Bytes.toBytes(reverseInt);
}else {
long reverseLong = Long.reverse(Bytes.toLong(reverseField));
reverseField = Bytes.toBytes(reverseLong);
}

System.arraycopy(reverseField, 0, rowkey, 0, firstFieldLenth);
return rowkey;
}
}

并发读取工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ParallelizeTableMapReduceUtil extends TableMapReduceUtil {

/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
* @param snapshotName The name of the snapshot (of a table) to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restore directory can be deleted.
* @param splitAlgo algorithm to split
* @param numSplitsPerRegion how many input splits to generate per one region
* @throws IOException When setting up the details fails.
* @see ParallelizeTableSnapshotInputFormat
*/
public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars, Path tmpRestoreDir,
RegionSplitter.SplitAlgorithm splitAlgo,
int numSplitsPerRegion)
throws IOException {
ParallelizeTableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
numSplitsPerRegion);
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, false, ParallelizeTableSnapshotInputFormat.class);
resetCacheConfig(job.getConfiguration());
}

}

并发读取InputFormat类

1
2
3
4
5
6
7
8
9
10
11
12
public class ParallelizeTableSnapshotInputFormat extends TableSnapshotInputFormat {

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
List<InputSplit> results = new ArrayList<InputSplit>();
for (ParallelizeTableSnapshotInputFormatImpl.InputSplit split : ParallelizeTableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
results.add(new TableSnapshotRegionSplit(split));
}
return results;
}

}

并发读取InputFormatImpl类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class ParallelizeTableSnapshotInputFormatImpl extends TableSnapshotInputFormatImpl {
private static final Log LOG = LogFactory.getLog(ParallelizeTableSnapshotInputFormatImpl.class);
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";

private static String getSnapshotName(Configuration conf) {
String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
if (snapshotName == null) {
throw new IllegalArgumentException("Snapshot name must be provided");
}
return snapshotName;
}

public static List<InputSplit> getSplits(Configuration conf) throws IOException {
String snapshotName = getSnapshotName(conf);

Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);

SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);

List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);

// TODO: mapred does not support scan as input API. Work around for now.
Scan scan = extractScanFromConf(conf);
// the temp dir where the snapshot is restored
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));

RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf);

int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1);

return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
}

public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
List<HRegionInfo> regionManifests, Path restoreDir,
Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException {
// load table descriptor
HTableDescriptor htd = manifest.getTableDescriptor();

Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());

///// added
Map<String, Future<HDFSBlocksDistribution>> blockDistributions = getHDFSBlockDistributionForAllRegions(conf, htd, tableDir, regionManifests);
HDFSBlocksDistribution blockDistribution;
/////
List<InputSplit> splits = new ArrayList<InputSplit>();
for (HRegionInfo hri : regionManifests) {
// load region descriptor
try {
blockDistribution = blockDistributions.get(hri.getEncodedName()).get();
} catch (InterruptedException | ExecutionException e) {
blockDistribution = new HDFSBlocksDistribution();
LOG.error(e.getMessage());
}
if (blockDistribution == null) {
blockDistribution = new HDFSBlocksDistribution();
}
if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) {
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
sp[i + 1])) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
//List<String> hosts = getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
List<String> hosts = getBestLocations(conf, blockDistribution);//added
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
Scan boundedScan = new Scan(scan);
if (scan.getStartRow().length == 0) {
boundedScan.withStartRow(sp[i]);
} else {
boundedScan.withStartRow(
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
}

if (scan.getStopRow().length == 0) {
boundedScan.withStopRow(sp[i + 1]);
} else {
boundedScan.withStopRow(
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
}
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
}
}
} else {
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
//List<String> hosts = getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
List<String> hosts = getBestLocations(conf, blockDistribution);//added
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
}

return splits;

}

/**
* added
*
* @param conf
* @param htd
* @param tableDir
* @param regionManifests
* @return
*/
private static Map<String, Future<HDFSBlocksDistribution>> getHDFSBlockDistributionForAllRegions(
final Configuration conf, final HTableDescriptor htd, final Path tableDir,
List<HRegionInfo> regionManifests) {

Map<String, Future<HDFSBlocksDistribution>> distribution = new HashMap<>();
int nrThreads = conf.getInt("hbase.snapshot.thread.pool", 8);
ExecutorService executor = Executors.newFixedThreadPool(nrThreads);
for (final HRegionInfo hri : regionManifests) {
distribution.put(hri.getEncodedName(), executor.submit(
new Callable<HDFSBlocksDistribution>() {
@Override
public HDFSBlocksDistribution call() throws Exception {
return HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir);
}
}));
}
return distribution;
}

}

2.HBaseBulkload数据重推模块

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class LoadHbaseTool extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(LoadHbaseTool.class);
private static OptionsProcessor optionsProcessor = null;

public static void main(String[] args) throws Exception {
int res = 1;
config(args);
System.out.println("load hbase !");
res = ToolRunner.run(new Configuration(), new LoadHbaseTool(), args);
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {

String hdfsUrl = optionsProcessor.getHdfsUrl();
//String hdfsUrl = "hdfs://ns22017";

Configuration conf = HBaseConfiguration.create();
conf.set("fs.defaultFS", hdfsUrl);
conf.setInt("mapreduce.map.cpu.vcores", 2);
conf.set("mapreduce.map.memory.mb", "4096");
conf.set("mapreduce.map.java.opts", "-Xmx3072M");
conf.setInt("mapreduce.reduce.cpu.vcores", 2);
conf.set("mapreduce.reduce.memory.mb", "4096");
conf.set("mapreduce.reduce.java.opts", "-Xmx3072M");

//重要:Text文件输入分片大小
Long minsize = 1024000000L;
conf.setLong("mapreduce.input.fileinputformat.split.minsize", minsize);

String erp = optionsProcessor.getErp();
// String erp = "shaoxiankai";
String instanceName = optionsProcessor.getInstanceName();
//String instanceName = "SL1000000003145";
String accesskey = optionsProcessor.getAccesskey();
//String accesskey = "MZYH5UIKEY3BUI5Y2N6AJ5YXCA";
String input = optionsProcessor.getInputPath();
//String input = "hdfs://ns22017/user/ads_sz_load/hbasetohive/result";
String output = optionsProcessor.getOutputPath();
//String output = "hdfs://ns22017/user/ads_sz_load/hivetohbase/result";
String tableName = optionsProcessor.getTableName();
//String tableName = "ibd_test:HB_IBD_SHOP_DIAGNOSIS_INDICATOR_TEST";

conf.set("bdp.hbase.erp", erp);
conf.set("bdp.hbase.instance.name", instanceName);
conf.set("bdp.hbase.accesskey", accesskey);
conf.setInt("hbase.client.operation.timeout", 600000);
conf.setInt("hbase.rpc.timeout", 300000);
//todo 这个参数啥作用?
conf.set("hbase.hregion.max.filesize",String.valueOf(20 * 10 * 1024 * 1024 * 1024L));
conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 1024);

Job job = Job.getInstance(conf, "LoadHbaseTool");
job.setJarByClass(LoadHbaseTool.class);

job.setMapperClass(LoadHbaseMapper.class);
job.setMapOutputValueClass(Put.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputFormatClass(HFileOutputFormat2.class);

String inputPathStrs[] = input.split("\\,");
Path paths[] = new Path[inputPathStrs.length];
for (int i = 0; i < inputPathStrs.length; i++) {
Path onePath = new Path(inputPathStrs[i]);
paths[i] = onePath;
}
Path outPath = new Path(output);
logger.info("inPath:" + input);
logger.info("outPath:" + output);

try {
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
fileSystem.close();
} catch (IOException e1) {
e1.printStackTrace();
}
FileInputFormat.setInputPaths(job, paths);
FileOutputFormat.setOutputPath(job, outPath);

Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();

TableName hbTableName = TableName.valueOf(tableName);
Table realTable = conn.getTable(hbTableName);
RegionLocator regionLocator = conn.getRegionLocator(hbTableName);

HFileOutputFormat2.configureIncrementalLoad(job, realTable, regionLocator);

Date date = new Date();
logger.info("mr start:" + date.toString());
boolean result = job.waitForCompletion(true);
logger.info("mr end:" + date.toString() + result);
Counters counters = job.getCounters();
for (TaskCounter e : TaskCounter.values()) {
logger.info("job finished," + e.name() + ":" + counters.findCounter(e).getValue());
}
if (!result) {
return 1;
}

// 将HFile文件导入HBase
// bulk load start
// 导入之前,测试mr把以下注释掉。 导入hbase则打开。需要报备。
logger.info("LoadIncrementalHFiles start:" + date.toString());
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outPath, admin, realTable, regionLocator);
logger.info("LoadIncrementalHFiles end:" + date.toString());

conn.close();
return result ? 0 : 1;
}

private static void config(String[] args) {
optionsProcessor = OptionsProcessor.getOptionsProcessor();
optionsProcessor.processOptions(args);
}
}

数据转换mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LoadHbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private ImmutableBytesWritable writeKey = new ImmutableBytesWritable();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (null != value) {
String line = value.toString();
String[] items = line.split("\t");

if (items.length == 0 || StringUtils.isBlank(items[0].trim())) {
return;
} else {
byte[] rowkey = StringHexUtils.HexStringToBytes(items[0]);
Put put = new Put(rowkey);

for (int i = 1; i < items.length - 1; i++) {

put.addColumn(Bytes.toBytes("d"), Bytes.toBytes(items[i].split(":")[0]), StringHexUtils.HexStringToBytes(items[i].split(":")[1]));
}

writeKey.set(rowkey);
context.write(writeKey, put);
}
}
}
}