HBase表数据倾斜治理_采用rowkey反转策略的bulkload

HBase表数据倾斜治理_采用rowkey反转策略的bulkload

HBase表数据倾斜治理(1)_shell命令实现bulkload中已经介绍bulkload的原理以及使用shell命令实现bulkload的方法。

数据存储到HBase当中非常容易发生数据倾斜,也就是各个region当中存储的数据数量差距较大。常用于避免HBase表数据倾斜的方法是:预分区+随机散列。此处不再赘述如何实现预分区,重点关注通过Mapper类实现rowkey的随机散列,并将其与bulkload相结合。

1.设计reverse散列策略

源文件fellow.csv数据如下,总共5列数据:id(int),birthday(String),name(String),sex(String),work(String)。

38343,2022/4/24,jiazhengyang,男,正式
374836,2022/4/4,zengqingfan,男,实习
34783484,2022/3/18,lijun,男,正式
37438,2022/4/11,lijiazheng,男,实习
2384783,2022/3/16,qiuyuchen,男,正式

将int类型的id进行reverse然后转化为字节数组1,将String类型的birthday转化为子节数组2,然后将数组1和数组2拼接在一起组成rowkey。这样就可以保证在有相应优化需求时直接拆出rowkey的前4个字节得到id。

2.准备工作

HBase表数据倾斜治理(1)_shell命令实现bulkload中的准备工作相同,只不过上次使用shell进行bulkload无法随意设计rowkey,使用java变成可以更加灵活地设计rowkey组成规则。

  • 开启hadoop和hbase
  • 创建csv数据文件并上传到HDFS系统
  • 创建Hbase表:create 'fellowjava','d'

3.创建Mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class BulkloadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

//重要:Mapper当中map()方法的Text value输入参数默认是Hfile源文件当中的一行数据,也就是Hbase表中的一行。
//value的结构为id+birthday+name+sex+work,设计id+birthday组成rowkey,name、sex、work作为列族D的列。
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String values = value.toString();
String[] lines = values.split(",");

byte[] rowkey = generateRowkey(lines[0], lines[1]);
byte[] columnFamily = "d".getBytes();
byte[] qualifierName = "name".getBytes();
byte[] qualifierSex = "sex".getBytes();
byte[] qualifierWork = "Work".getBytes();

ImmutableBytesWritable putRowKey = new ImmutableBytesWritable(rowkey);
Put put = new Put(rowkey);
put.add(columnFamily, qualifierName, generateValue(lines[2]));
put.add(columnFamily, qualifierSex, generateValue(lines[3]));
put.add(columnFamily, qualifierWork, generateValue(lines[4]));

//重要:该方法执行一次,即表示在HFile文件中写入一行数据
context.write(putRowKey, put);
}

//将id和birthday转化为byte[],并将id进行reverse,然后拼接在一起组成rowkey,达到散列的目的,避免数据倾斜。
private byte[] generateRowkey(String id, String birthday) {

byte[] newBytes = null;

try {
//先将id转化成Interger类型,然后进行反转。
int d = Integer.parseInt(id);
//重要:使用反射调用本类当中的reverse()方法,这样的话就可以输入不同的数据类型。
Object objectId = this.getClass().getMethod("reverse", Object.class).invoke(this, d);
int reverseId = (int) objectId;

byte[] idByte = Bytes.toBytes(reverseId);
byte[] birthdayByte = Bytes.toBytes(birthday);

//将两个byte[]拼接在一起。
int length = idByte.length + birthdayByte.length;
newBytes = new byte[length];
System.arraycopy(idByte, 0, newBytes, 0, 4);
System.arraycopy(birthdayByte, 0, newBytes, 4, birthdayByte.length);
}
catch (Exception e) {
e.printStackTrace();
}

return newBytes;
}

//将value字符串转化为byte[]。
private byte[] generateValue(String value) {
return Bytes.toBytes(value);
}

//重要:要通过反射来调用的方法,访问级别必须是public。
//对Long、Interger、String三种类型的数据进行reverse。
public Object reverse(Object data) {
if (Long.class.equals(data.getClass())) {
return Long.reverse((long) data);
}
if (Integer.class.equals(data.getClass())) {
return Integer.reverse((int) data);
}
else {
return StringUtils.reverse((String) data);
}
}
}
  • Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中的四个泛型:
    • KEYIN表示输入数据行的key的数据类型:当输入HFile文件为字符串文件时没有作用;当输入HFile文件为HBase表时用ImmutableBytesWritable对象来装载rowkey。
    • VALUEIN表示输入数据行的value的数据类型:当输入HFile文件为字符串文件时用Text对象来装载整行字符数据;当输入HFile文件为HBase表时用Result对象来装载整行单元格对象。
    • KEYOUT表示输出数据行的key的数据类型:当输出HFile文件为字符串文件时用Text对象来装载字符数据;当输出HFile文件为HBase表时用ImmutableBytesWritable对象来装载rowkey。
    • VALUEOUT表示输出数据行的value的数据类型:当输出HFile文件为字符串文件时用Text对象来字符数据;当输出HFile文件为HBase表时用Put对象来装载整行单元格对象。
  • map方法中的context.write()方法执行一个就表示在输出HFile中写入一行数据,一个map方法中可以执行多次context.write()方法。执行n次,输出HFile文件的行数就是输入HFile文件行数的n倍。
  • 反射常用来调用入参类型为Object的方法,如上所编写的reverse方法。

4.创建Driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class BulkloadDriver {
public static void main(String[] args) throws Exception {

//这里创建HBase连接的目的是将数据放到目标表当中。
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("fellowjava"));
Admin admin = conn.getAdmin();

//设置相关类
Job job = Job.getInstance(conf, "jiazhengyang bulkload: fellowjava");
job.setJarByClass(BulkloadDriver.class);
job.setMapperClass(BulkloadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

//设置源Hfile路径和目标Hfile路径
String hdfsRoot = "hdfs://localhost:9005/";
String inputFile = hdfsRoot + "fellowtest";
String outputFile = hdfsRoot + "fellowjavahfile";
Path outputPath = new Path(outputFile);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, inputFile);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.setOutputPath(job, outputPath);

//配置MR作业任务,将源Hfile数据以增量的形式加载到目标Hfile当中,并生成Hbase表元数据文件。
HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf("fellowjava")));

job.waitForCompletion(true);

//在MR作业完成后,将Hbase表元数据文件推送给Hbase的RegionServers,这样就可以通过Hbase操作表数据。
if (job.isSuccessful()) {
System.out.println("Hfile创建成功!");
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outputPath, admin, table, conn.getRegionLocator(TableName.valueOf("fellowjava")));
}else {
System.out.println("Hfile创建失败!");
}
}
}

运行main方法即可。