HBase_HBase的批量写工具类BufferedMutator原理与应用实战

HBase_HBase的批量写工具类BufferedMutator原理与应用实战

1.背景

如果未开启HBase客户端api的缓冲区,那么一次put就是一个RPC操作,将客户端数据传输到服务器再等待返回结果,这非常耗时,小数据量传输写入还好,如果数据量多,每一个put都建立一次RPC连接和数据传输非常耗时。为解决该问题,HBase客户端推出了缓冲区配置,设置HTable.setAutoFlush(false)来开启缓冲区,只有当put到达一定容量后或者用户手动触发flush()时才会将缓冲区数据一次性提交到HBase服务器。但是HBase的api还是存在需要同步等待、最后关闭连接前手动触发flush()等问题。为了解决该问题,HBase推出了BufferedMutator,基本原理与缓冲区类似,可以避免频繁的RPC调用,还实现了异步提交、close()前自动flush()等功能。

2.BufferedMutator介绍

2.1 参数介绍

TableName(表名)

writeBufferSize(写缓存大小)

maxKeyValueSize(最大key-value大小)

ExecutorService(执行线程池)

ExceptionListener(监听BufferedMutator的异常)。

2.2 调用过程

1.构建put或者List[put]

2.调用BufferedMutator.mutate()方法

3.刷写到hbase

主要有三种触发方式:

1)显式调用BufferedMutator.flush()

2)发送结束的时候调用BufferedMutator.close()

3)它根据当前缓存大于了设置的写缓存大小,自动刷写:

1
2
3
while (undealtMutationCount.get() != 0 && currentWriteBufferSize.get() > writeBufferSize) {
backgroundFlushCommits(false);
}

这三种触发方式最终都是调用的backgroundFlushCommits方法。

2.3 使用场景

MapReduce Job的是BufferedMutator使用的典型场景。MapReduce作业需要批量写入,但是无法找到恰当的点执行flush。BufferedMutator接收MapReduce作业发送来的Put数据后,会根据writeBufferSize自动执行执行Batch Put操作,且会异步的提交Batch Put请求,这样MapReduce作业的执行也不会被打断,且MapReduce作业的每个线程将会拥有一个独立的BufferedMutator对象。

一个独立的BufferedMutator也可以用在大容量的在线系统上来执行批量Put操作,但是这时需要注意一些极端情况比如JVM异常或机器故障,此时有可能造成数据丢失。

3.商智数仓出数到HBase工具实战

HBaseTableWriter类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public class HBaseTableWriter extends Thread implements ITableWriter {
/** HBase表操作对象 */
private BufferedMutator table;

/** 数据处理完,是否停止线程 */
private boolean dieWhileDataOver;
/** 数据缓冲区 */
private Queue<Put> queue = new ConcurrentLinkedQueue<Put>();

/** hbase表导入条数 */
private int hbaseWriteRowNum = 0;
/** hbase表名称 */
private String tableName;
/** 是否启用线程 */
private boolean enableThread;

/** 缓存堆大小上限 */
private AtomicLong heapLimitSize = new AtomicLong(0);// 524288000;
// //500m
/** 数据大小统计 */
private AtomicLong putSize = new AtomicLong(0);

DataPickerConfiguration dpConf;


public HBaseTableWriter(String tableName) {
this.tableName = tableName;
this.dpConf = DataPickerConfiguration.getConfiguration();
}

/**
* 初始化各种类变量,如果必要启动线程
*/
@Override
public void init() throws IOException {
DataClient dataClient = ClientFactory.getDataClient(dpConf.getInstanceName(), dpConf.getAccessKey(), dpConf.getNamespace());
table = dataClient.getMutator(getTableName(), dpConf.getHbaseWriteBufferSize());

enableThread = DataPickerConfiguration.getConfiguration().isEnableMultiThread();
if (enableThread) {
this.start();
}
hbaseWriteRowNum = 0;
dieWhileDataOver = false;
queue.clear();
}

/**
* 往Hbase写入行数据,如果多线程模式,则写入线程队列。否则直接操作HTABLE
*
* @param rowData
* @throws java.io.IOException
*/
@Override
public void writeRow(HBaseRowData rowData) throws IOException {
hbaseWriteRowNum++;
Put put = rowData.toPut();
//put.setWriteToWAL(false);
if (enableThread) {
addPut(put);
} else
getHTable().mutate(put);
}

@Override
public String getTableName() {
return tableName;
}

@Override
public void close() throws IOException, InterruptedException {
if (enableThread) {
this.setDieWhileDataOver(true);
this.join();
}
//table.flushCommits();
table.close();
//TODO:table.close();
//HBaseTabler tabler = new HBaseTabler(context);
//tabler.flushTable(this.getTableName());
}

/**
* 将put加入输出队列
* 如果队列个数大于指定个数(500),则等待。小于时,则加入队列
* 解决在处理大数据表时,内存无限占用的问题(输出速度跟不上输入速度,造成队列太大)
*
* @param put
*/
public void addPut(Put put) {
while (true) {
if (heapLimitSize.get() < 104857600) {// 352,986,752 100M
if (!queue.offer(put)) {
PickerLog.logError(Bytes.toStringBinary(put.getRow()) + " addPut error occurs.");
}
heapLimitSize.addAndGet(put.heapSize());
break;
} else
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

public void setDieWhileDataOver(boolean dieWhileDataOver) {
this.dieWhileDataOver = dieWhileDataOver;
}

/**
* 线程执行体, 一直取队列数据往HTable里面写入
*/
public void run() {
List<Throwable> exceptions = new ArrayList<Throwable>();
int rowNum = 0;
int rowFailed = 0;
while (!Thread.currentThread().isInterrupted()) {
Put put = queue.poll();
if (put != null) {
try {
this.getHTable().mutate(put);
rowNum++;
heapLimitSize.addAndGet(-put.heapSize());
putSize.addAndGet(put.heapSize());
} catch (Exception e) {
rowFailed++;
if(rowFailed < 100)
exceptions.add(e);
}
} else {
if (dieWhileDataOver) {
break;
}
}
}
PickerMonitor.setDestTotal(hbaseWriteRowNum);
PickerMonitor.setDestSucc(rowNum);
PickerMonitor.setPutSize(putSize.get());
if (exceptions.size() > 0){
MultiException exp = new MultiException(exceptions, hbaseWriteRowNum - rowNum, hbaseWriteRowNum);
PickerLog.logError("error occurs while put table data: ", exp);
}
}

public BufferedMutator getHTable() {
return table;
}

}

HBaseClient获取BufferedMutator对象方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 获取异步写操作对象
* @param tableName 表名称
* @param writeBufferSize 写数缓冲大小
* @return Mutator
* @throws IOException
*/
public BufferedMutator getMutator(String tableName, int writeBufferSize) throws IOException{
tableName = addNamespace(tableName);
BufferedMutatorParams bufferedMutatorParams =
new BufferedMutatorParams(TableName.valueOf(Bytes.toBytes(tableName)));
bufferedMutatorParams.writeBufferSize(writeBufferSize);
return connection.getBufferedMutator(bufferedMutatorParams);
}

线程队列缓冲区

注意,此Writer类中对于多线程写入场景,为了避免连接数膨胀和稳定性,基于ConcurrentLinkedQueue<Put>创建了一个线程队列缓冲区属性,所有写入线程都必须先将数据放入该队列中,然后统一使用一个线程去进行单线程写入。为了防止读取速度比写HBase速度快,造成内存膨胀超限,还设置了缓存队列最大长度,当队列超长需要阻塞等待,起到了削峰限流的作用。

这种基于线程安全队列实现的缓冲区也可以用于限流、秒杀、锁等要求线程安全的场景。

参考文献

Hbase源码系列之BufferedMutator的Demo和源码解析