HBase_Spark_Connector开源代码适配自定义timestamp和cellTTL功能
1.背景
之前使用MapReduce实现从Hive表的HDFS源文件中读取行数据,进行转化和写入HBase,使用这种方法时直接使用HFileOutputFormat2.configureIncrementalLoad()方法即可使用HBase封装好的排序Reducer。但是使用这种MapReduce有一个缺点,必须直接读取Hive表源文件,无法做到先进行过滤或聚合计算再进行数据转换和推数,这样在很多情况下会造成hive存储资源的浪费。为了解决这个问题,我寻找开源代码,发现可以通过Spark执行sql,再将SparkSQL执行结果放入DF中,再进行数据转换和推数,hbase-connectors工程就封装实现了该功能,让我们能够像使用MapReduce一样方便快捷地使用Spark进行HBase推数。
但是该开源工程还未实现能够自定义写入HBase数据cell的timestamp和tags参数的功能,为了实现不同业务数据按不同ttl过滤,从而避免存储浪费,现添加一个方法,实现能够自定义timestamp和ttlTag参数并传入生效。
2.代码改造
2.1 添加一个Cell抽象类
该开源工程中定义了一个FamiliesQualifiersValues类,用来标识一个cell的存储和排序方式,对应的创建一个FamiliesQualifiersValuesWithTimeStampAndTimeToLive类,添加timeStamp和tags属性:
1 | package org.apache.hadoop.hbase.spark |
2.2 在HBaseRDDFunctions中添加对应bulkload方法
HBaseRDDFunctions中常用的bulkload方法就是hbaseBulkLoad、hbaseBulkLoadThinRows,现对应新增一个hbaseBulkLoadThinRowsWithTimeStampAndTimeToLive方法,替换入参为FamiliesQualifiersValuesWithTimeStampAndTimeToLive:
1 | def hbaseBulkLoadThinRowsWithTimeStampAndTimeToLive(hc: HBaseContext, |
2.3 在HBaseContext中添加对应bulkload方法实现
在HBaseContext中编写了bulkLoadThinRows等方法的具体实现逻辑,现对应将hbaseBulkLoadThinRowsWithTimeStampAndTimeToLive方法的实现逻辑也新增在该类中:
1 | def bulkLoadThinRowsWithTimeStampAndTimeToLive[T](rdd:RDD[T], |
2.4 修改默认压缩格式
如果在推数组件中不指定压缩格式,数据推送到HBase中之后数据可用,且会在compact之后按HBase设置的压缩格式重新压缩,但是这样会导致hfile文件load速度特别慢,且HBase集群存储短期膨胀非常严重。如果在推数组件中设置好ZSTD,在进行hfile数据写入时就进行数据压缩,可以大大提高效率,缓解存储压力。如ZSTD压缩格式的文件仅为未压缩文件的十分之一大小。该开源组件中默认是不使用压缩格式,现修改代码为默认使用ZSTD压缩格式,就在bulkLoadThinRowsWithTimeStampAndTimeToLive()方法中:
1 | val defaultCompressionStr = config.get("hfile.compression", |