HBase_HBase的批量写工具类BufferedMutator原理与应用实战
1.背景
如果未开启HBase客户端api的缓冲区,那么一次put就是一个RPC操作,将客户端数据传输到服务器再等待返回结果,这非常耗时,小数据量传输写入还好,如果数据量多,每一个put都建立一次RPC连接和数据传输非常耗时。为解决该问题,HBase客户端推出了缓冲区配置,设置HTable.setAutoFlush(false)来开启缓冲区,只有当put到达一定容量后或者用户手动触发flush()时才会将缓冲区数据一次性提交到HBase服务器。但是HBase的api还是存在需要同步等待、最后关闭连接前手动触发flush()等问题。为了解决该问题,HBase推出了BufferedMutator,基本原理与缓冲区类似,可以避免频繁的RPC调用,还实现了异步提交、close()前自动flush()等功能。
2.BufferedMutator介绍
2.1 参数介绍
TableName(表名)
writeBufferSize(写缓存大小)
maxKeyValueSize(最大key-value大小)
ExecutorService(执行线程池)
ExceptionListener(监听BufferedMutator的异常)。
2.2 调用过程
1.构建put或者List[put]
2.调用BufferedMutator.mutate()方法
3.刷写到hbase
主要有三种触发方式:
1)显式调用BufferedMutator.flush()
2)发送结束的时候调用BufferedMutator.close()
3)它根据当前缓存大于了设置的写缓存大小,自动刷写:
1 | while (undealtMutationCount.get() != 0 && currentWriteBufferSize.get() > writeBufferSize) { |
这三种触发方式最终都是调用的backgroundFlushCommits方法。
2.3 使用场景
MapReduce Job的是BufferedMutator使用的典型场景。MapReduce作业需要批量写入,但是无法找到恰当的点执行flush。BufferedMutator接收MapReduce作业发送来的Put数据后,会根据writeBufferSize自动执行执行Batch Put操作,且会异步的提交Batch Put请求,这样MapReduce作业的执行也不会被打断,且MapReduce作业的每个线程将会拥有一个独立的BufferedMutator对象。
一个独立的BufferedMutator也可以用在大容量的在线系统上来执行批量Put操作,但是这时需要注意一些极端情况比如JVM异常或机器故障,此时有可能造成数据丢失。
3.商智数仓出数到HBase工具实战
HBaseTableWriter类:
1 | public class HBaseTableWriter extends Thread implements ITableWriter { |
HBaseClient获取BufferedMutator对象方法:
1 | /** |
线程队列缓冲区
注意,此Writer类中对于多线程写入场景,为了避免连接数膨胀和稳定性,基于ConcurrentLinkedQueue<Put>创建了一个线程队列缓冲区属性,所有写入线程都必须先将数据放入该队列中,然后统一使用一个线程去进行单线程写入。为了防止读取速度比写HBase速度快,造成内存膨胀超限,还设置了缓存队列最大长度,当队列超长需要阻塞等待,起到了削峰限流的作用。
这种基于线程安全队列实现的缓冲区也可以用于限流、秒杀、锁等要求线程安全的场景。