HBase_HBase自定义带版本因子timestamp与行粒度TTL专利与实战代码

HBase_HBase自定义带版本因子timestamp与行粒度TTL专利与实战代码

一、专利撰写

一种自定义行粒度生命周期的HBase表大批量数据写入方法

技术领域

本发明属于HBase数据库技术领域,具体涉及一种为每一行数据设置不同timestamp和cell TTL(time to live)的HBase表大批量数据写入方法。

背景技术

HBase是一种非关系型数据库,具有高性能、高可靠性、分布式、可伸缩等特点,非常适合存储非结构化数据。HBase数据库构建于Hadoop分布式文件系统之上,在物理存储上借助列簇实现KV(Key-Value)数据的存储,底层通过HDFS副本机制实现数据的冗余存储,具有极其出色的读写性能。

HBase支持列簇粒度TTL和单元格粒度TTL。TTL的作用原理:系统当前毫秒级时间戳currentMilliseconds减去单元格时间戳timestamp 的差值大于TTL,则判定该单元格过期,无法被查询返回,在数据合并时被删除。列簇粒度TTL和单元格粒度TTL可以同时设置,系统按照最小的TTL来判断单元格数据是否过期。

数据应用平台为了应对高并发场景下的数据查询请求压力,通常将明细数据按照设定的维度路径和指标口径进行预计算后以KV的形式存储在HBase数据库中。为了提供更加灵活和全面的数据展现形式,对于具有固定业务口径的一块数据看版,通常会提供单天、周至今、近7天、小时等不同聚合时间粒度的预计算数据供筛选切换,也会提供类目、品牌、商品等不同聚合维度路径的预计算数据供筛选切换。将这些时间粒度、维度路径等具有不同业务含义的聚合数据变化情况与营销、推荐等策略进行结合分析,即可帮助数据应用平台用户更加快捷、方便地了解数据影响因素,针对性地制定数据表现提升方案。

数据应用平台使用多种多样的聚合时间粒度和维度路径预计算数据为用户提供灵活的时间范围和聚合维度选项,这些具有相同指标口径的数据通常存储在同一张HBase表中,只需要根据传入的聚合时间粒度类型和聚合维度枚举,即可区分具有不同业务含义的数据。针对该HBase表只需要在数据服务中建立一个数据模型,通过传入不同聚合维度组合和对应维度过滤值即可方便快捷地获取对应聚合数据,实现了模型口径唯一、服务开发便捷、高效查询、响应速度快等优点。但是在实际业务过程中,同一张HBase表中具有不同业务含义的预计算结果数据通常具有不同历史周期查询优先级,如部分业务数据历史周期的查询概率较高则需要存储较长周期,而部分业务数据历史周期的完全不会被查询到,随着业务模块的不断增加,HBase集群占用存储资源增长迅速,集群无法进行无限制的扩容;且每次新上业务需求进行历史数据回溯都会造成HBase数据生命周期计算起点时间戳timestamp刷新,导致历史数据无法按照预设生命周期及时过期清除,需要再等待一整个HBase列簇粒度TTL才能被真正清理。

鉴于上述HBase数据库业务增长与存储现状,有必要针对存储无限制增长和数据冗余严重问题,在满足用户看数需求和数据存储成本中寻找一个平衡点,抑制存储增长,进行存储治理。同一张HBase表中具有不同业务含义的数据可以设置不同TTL,业务调整带来的历史回溯数据也应该按照业务时间正常过期。因此,设计一种针对同一张HBase表中不同业务含义数据行设置不同timestamp和cell TTL的大批量数据写入方法,显得十分迫切和重要。

发明内容

本发明的目的是克服现有技术的不足,提供一种针对同一张HBase表中不同业务含义数据行设置不同timestamp和cell TTL的大批量数据写入方法,该方法能够有效缓解HBase集群存储无限制增长和数据冗余严重的问题。

为了克服现有技术问题,本发明是通过以下技术方案实现的:

1)一种针对同一张HBase表中不同业务含义数据行设置不同timestamp和cell TTL的大批量数据写入方法,特征在于数据集市应用层按照各种业务维度组合和聚合时间粒度预计算得到各个不同业务层级的结果数据,存放在Hive表中,采用MapReduce计算引擎并发读取Hive结果表的数据文件。

2)按上述技术方案,所述MapReduce计算任务的Map阶段,每个Task读取一个Hive表存储文件,逐行读取Hive表存储文件中Text格式的预计算结果数据行,解析出字段数据并计数。

3)按上述技术方案,所述MapTask解析提取出能够标识结果数据行唯一性的所有聚合维度字段值,这些维度值按照指定顺序拼接在一起能够具有唯一的业务含义。

4)按上述技术方案,将所述能够标识数据行唯一性的聚合维度字段值按照从低基维度到高基维度的顺序排列,并将首个维度字段值进行反转散列再拼接,作为该数据行的Rowkey。

5)按上述技术方案,所述MapTask解析提取出业务生产时间datetime,并将其转换为毫秒级Unix timestamp,作为预计算结果数据行的业务生产时间戳。

6)按上述技术方案,将所述业务生产时间戳加上版本因子,作为该数据行的timestamp,所述timestamp计算公式如下:img)式中img)为上述业务生产时间戳,img)为推数任务运行时系统时间戳,img为一个自然天的毫秒数。

7)按上述技术方案,所述MapTask解析提取出能够标识结果数据行在历史周期中业务新鲜度的维度字段值,根据这些具有不同业务含义的维度值确定该数据行在历史周期中的存储优先级。

8)按上述技术方案,对上述历史存储优先级较高的数据行的设置较长的TTL,对上述历史存储优先级较低的数据行设置较短的TTL,将TTL封装为KeyValue对象的Tag属性。

9)按上述技术方案,将所述Rowkey、timestamp、TTL Tag作为对象属性组装成HBase的一个Put对象,一个HBase数据库格式的数据行可以包含具有相同Rowkey、timestamp、TTL Tag属性和不同value属性的多个Put对象。

10)按上述技术方案,所述MapReduce计算任务的Shuffle阶段,将Map阶段解析处理好的数据行根据Rowkey按照目标HBase表的Region切分点进行切分,发送到不同的Reduce节点上。

11)按上述技术方案,所述MapReduce计算任务的Reduce阶段,每个Task将其所收到的所有数据行根据Rowkey按照字典序排序,通过检验排序后相邻数据行的Rowkey是否相等来校验每个数据行的业务含义唯一性,当存在相邻数据行Rowkey相等则抛出异常,停止运行计算任务。

12)按上述技术方案,将所述解析处理、切分和排序判重完成后的HBase格式数据行按照指定压缩格式存入HFile文件中,可以通过参数指定ZSTD、BZIP2、LZ4等压缩格式。

13)按上述技术方案,将所述压缩HFile文件存储在HDFS集群的临时目录中,使用BulkLoad方式将所述HFile文件推送到目标HBase集群存储目录中。

14)按上述技术方案,将所述大批量数据写入方法将所述MapReduce计算任务Map阶段统计的解析数据行数与所述数据集市Hive表中数据行数进行比较,校验写入总行数成功后放开数据服务查询请求。

本发明提供的自定义行粒度生命周期的HBase表大批量数据写入方法与现有技术相比,具有以下有益效果:将组成Rowkey的首个业务字段进行反转散列并按从低基到高基维度排序再拼接,实现了HBase数据的均匀散列、避免聚集,同时还能实现灵活的业务维度组合前缀匹配查询;提取业务生产时间作为数据单元格timestamp,避免业务数据回溯时按照数据更新时间重新计算TTL造成数据存储冗余;为业务生产时间timestamp添加版本因子,确保数据单元格在更新前后业务生产时间不变的情况下,更新后的timestamp大于更新前的timestamp,避免数据更新失效;为同一张HBase表中具有不同业务含义的数据添加不同的生命周期,将不会再被使用的历史数据提前清理,缓解存储资源紧张问题;通过检验有序相邻数据行的Rowkey是否相等来确保每个数据行的业务唯一性,方便快速发现上游数据问题和响应;采用ZSTD等压缩格式存储HFile文件并直接推送到HBase集群存储目录中,传输速度快,占用带宽、存储资源少;比对写入目标HBase表数据行数和源Hive表数据行数,确保数据写入无遗漏。

附图说明

图1为本发明的整体流程示意图。

图2为本发明所述的HBase单元格自定义组装的属性。

图1:

图2:

二、实战代码

1.Spark2HBase推数工具

注意此处目前在实际代码中有三处与上述专利内容存在差异:

1)此处使用Spark引擎替换MapReduce引擎,计算效率更高,且可以先进行逻辑运算再将存储在内存中的RDD数据直接进行解析转化写入HBase,而MapReduce引起只能直接读取Hive表数据文件;

2)此处没有给timestamp加上版本因子,是线上使用自定义timestamp然后进行数据回刷之后发现数据没有更新才暴露出的这个问题,后续才想到版本因子这个解法,但是还没真正写到代码中并应用到线上;

3)此处的rowkey没有采用首个业务字段反转的散列方式,而是采用前缀字段MD5的方式进行散列,数据均匀性好很多。

实战代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
object Spark2HBase {
val SEPARATOR: String = "|"
val COMMA: String = ","
val EMPTY: String = ""
val DATEFORMAT: String = "yyyy-MM-dd HH:mm:ss"
val DATESUFFIX: String = " 23:59:59"

def main(args: Array[String]): Unit = {

//指标与id --uv:392,pv:123
val indicationString = args(0)
//维度与排序后id --123:stat_ct_cd,234:shop_id,240:sku_id,246:chan_cd
val dimensionString = args(1)
//时间字段名称 --dt
val time = args(2)
//时间粒度字段名称 --tp
val timeInterval = args(3)
//时效类型 --offtime
val timeAgingType = args(4)
//分时字段名称 --time_granularity
val timeSplitField = args(5)
//预计算sql
val sparkSql = args(6)
//accesskey --NA4CH6B7RFYAJLV3ZMMAHDFGO4
val accesskey = args(7)
//instanceName --CL1000000005186
val instanceName = args(8)
//目标hbase表名
val hbaseTableName = args(9)
//hfile临时存储路径 --hdfs://ns3/user/mart_sz/ads_zs/app_zs_z1702_product_compete_source_deletetest/dt=2023-06-01/stat_ct_cd=day/sz_elaHB_IBD_SHOP_VIEW_INSHOP_NEW
val outputPath = args(10)
//源数据表存储路径根目录 --hdfs://ns3
val inputRootPath = args(11)
//是否开启按业务周期写入timestamp标识
var customTimestampFlag = args(12).toBoolean
//是否开启对d7,d30,非周末tw,非月末tm预计算数据单独设置ttl标识
var cellTimeToLiveFlag = args(13).toBoolean
//单独设置ttl寿命,单位为毫秒
val cellTimeToLive = args(14).toLong * 24 * 60 * 60 * 1000
//rowkey首字段散列方式
val hashFunction = args(15)
//scan维度字段 --shop_id,sku_id
val scanFields = args(16)
//生产预发环境标识 --PROD/PRE
val multiEnv = args(17)
//sparkSQL的partition数量
val partitions = args(18).toInt

//初始化spark环境
val bld7 = "BY_LAST_DAYS_7"
val bld30 = "BY_LAST_DAYS_30"
val bma = "BY_MONTH_ACCU"
val bwa = "BY_WEEK_ACCU"
val bm = "BY_MONTH"
val bw = "BY_WEEK"
//周至今、月至今、近7、近30的同比数据与当期数据同时写入,如果要写入cellttl,则不开启按业务周期写入timestamp标识
if ((sparkSql.contains(bld30) || sparkSql.contains(bma) || sparkSql.contains(bld7) || sparkSql.contains(bwa)) && cellTimeToLiveFlag) {
cellTimeToLiveFlag = true
customTimestampFlag = false
} else {
cellTimeToLiveFlag = false
}

println(s"指标与id:$indicationString")
println(s"维度与排序后id:$dimensionString")
println(s"时间字段名称:$time")
println(s"时间粒度字段名称:$timeInterval")
println(s"时效类型:$timeAgingType")
println(s"分时字段名称:$timeSplitField")
println(s"预计算sql:$sparkSql")
println(s"accesskey:$accesskey")
println(s"instanceName:$instanceName")
println(s"目标hbase表名:$hbaseTableName")
println(s"hfile临时存储路径:$outputPath")
println(s"源数据表存储路径根目录:$inputRootPath")
println(s"是否开启按业务周期写入timestamp标识:$customTimestampFlag")
println(s"是否开启对d7,d30,非周末tw,非月末tm预计算数据单独设置ttl标识:$cellTimeToLiveFlag")
println(s"单独设置ttl寿命:$cellTimeToLive")
println(s"rowkey首字段散列方式:$hashFunction")
println(s"scan维度字段:$scanFields")
println(s"sparkSQL的partition数量:$partitions")

val sparkSession = SparkSession.builder().
appName("hdfs 2 hbase through spark").
config("spark.sql.shuffle.partitions", partitions).
config("spark.executor.extraJavaOptions", "-XX:MaxHeapSize=4G -XX:ParallelGCThreads=2 -XX:MaxJavaStackTraceDepth=1000000").
enableHiveSupport().getOrCreate();
val spark = sparkSession
val sc = spark.sparkContext
val accumulator = sc.longAccumulator("RowCountor")

//获取指标-id映射map
val indicationList = indicationString.split(",")
val indicatorMap = new mutable.HashMap[String, String]
for (element <- indicationList) {
indicatorMap.put(element.split(":")(0), element.split(":")(1))
}

//获取id-维度映射map
val dimensionList = dimensionString.split(",")
val dimensionMap = new mutable.HashMap[String, String]
for (element <- dimensionList) {
if (!"dt".equals(element.split(":")(1))) {
dimensionMap.put(element.split(":")(0), element.split(":")(1))
}
}

//获取需要拼接到rowkey中的维度id list
val dimensionIdList = new mutable.ListBuffer[String]
for (element <- dimensionList) {
if (!"dt".equals(element.split(":")(1))) {
dimensionIdList.append(element.split(":")(0))
}
}

//获取scan维度字段list
var scanFieldList = new mutable.ListBuffer[String]
if (!"NULL".equals(scanFields)) {
for (element <- scanFields.split(",")) {
scanFieldList.append(element)
}
}

//获取需要从行数据中解析出来的字段,包括指标+维度+时间+时间粒度的set
val indicatorDimensionSet = new mutable.HashSet[String]
for (element <- indicatorMap.keys) {
indicatorDimensionSet.add(element)
}
for (element <- dimensionMap.values) {
indicatorDimensionSet.add(element)
}
indicatorDimensionSet.add(time)
indicatorDimensionSet.add(timeInterval)
indicatorDimensionSet.add(timeSplitField)

//初始化hbase环境
val config = new HBaseConfiguration()
config.set("bdp.hbase.accesskey", accesskey)
config.set("bdp.hbase.instance.name", instanceName)
config.set("fs.defaultFS", inputRootPath)
config.setLong("hbase.rpc.timeout", 14400000)
config.setBoolean("hbase.policy.http.client.v2.enabled", false)
val hbaseContext = new HBaseContext(sc, config)

//执行sparkSQL
var dataFrame: Dataset[Row] = null
dataFrame = spark.sql(sparkSql)
val schema = dataFrame.schema
var sourceRdd = dataFrame.rdd

//如果输出目录存在则删除
val dfs = FileSystem.get(config)
if (dfs.exists(new Path(outputPath))) {
dfs.delete(new Path(outputPath), true)
}

var formatRdd = sourceRdd.map(row => {
accumulator.add(1)

//用于存储拼装key的键值对
var rowMap = new mutable.HashMap[String, String]

//从一行数据中取出需要的字段名和字段值
schema.foreach { f =>
val fieldName = f.name
val fieldIndex = row.fieldIndex(fieldName)
val fieldValue = row.get(fieldIndex)
if (indicatorDimensionSet.contains(fieldName))
if(fieldValue != null){
rowMap.put(fieldName, fieldValue.toString)
}
}

var rowkey: String = rowMap(time) + SEPARATOR + getStandardTimeInterval(rowMap(timeInterval)) + SEPARATOR + timeAgingType + SEPARATOR + getTimeSharingType(rowMap(timeInterval)) + SEPARATOR
for(element1 <- dimensionIdList) {
if(rowMap.contains(dimensionMap(element1))) {
rowkey += element1
rowkey += COMMA
}
}
rowkey = rowkey.substring(0, rowkey.length - 1)
rowkey += SEPARATOR
for(element1 <- dimensionIdList) {
if(rowMap.contains(dimensionMap(element1)) && !scanFieldList.contains(dimensionMap(element1))) {
rowkey += rowMap(dimensionMap(element1))
rowkey += COMMA
}
}
rowkey = rowkey.substring(0, rowkey.length - 1)

rowkey = getHashValue(rowkey, "String", hashFunction) + SEPARATOR + rowkey

//添加scan字段值
for (element1 <- scanFieldList) {
if(rowMap.contains(element1)) {
rowkey += COMMA
rowkey += rowMap(element1)
}
}

//添加分时时间切片字段,格式为 hh:mm,scan字段和分时切片不能同时使用。
if (rowMap.contains(timeSplitField)) {
rowkey += COMMA
rowkey += rowMap(timeSplitField)
}

//预发环境在key头部添加"pre_"前缀
if ("PRE".equals(multiEnv)) {
rowkey = "pre_" + rowkey
}

val familyQualifiersValuesWithTimeStampAndTimeToLive = new FamiliesQualifiersValuesWithTimeStampAndTimeToLive
for(element <- indicatorMap.keys) {
familyQualifiersValuesWithTimeStampAndTimeToLive += (Bytes.toBytes("d"), Bytes.toBytes(element), Bytes.toBytes(if(rowMap.contains(element)) rowMap(element) else ""), getTimestamp(customTimestampFlag, rowMap(time)) , getCellTimeToLive(cellTimeToLiveFlag, cellTimeToLive))
}
(new ByteArrayWrapper(Bytes.toBytes(rowkey)), familyQualifiersValuesWithTimeStampAndTimeToLive)
})

formatRdd.hbaseBulkLoadThinRowsWithTimeStampAndTimeToLive(hbaseContext, TableName.valueOf(hbaseTableName), t => {t}, outputPath)

val conn = ConnectionFactory.createConnection(config)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val load = JDBulkLoadHFiles.create(config)

try {
load.bulkLoad(table.getName, new Path(outputPath))
} catch {
case e:Exception => {
e.printStackTrace()
println(e.getMessage)
System.exit(1)
}
}

if (table != null) {
table.close()
}
if (conn != null) {
conn.close()
}

println("总写入行数:" + accumulator.value)
sparkSession.stop()
println("spark2hbase任务执行完毕!")
//todo spark任务运行完成后,一些线程池连接并未合理关闭,暂时先用主线程强制正常退出来进行任务关闭。
System.exit(0)
}

//将数据按照指定方式进行散列(目前先写死是md5) todo
def getHashValue(value: String, valueType: String, function: String): String = {
var md5: MessageDigest = null
try {
md5 = MessageDigest.getInstance("MD5")
}
catch {
case e: Exception => {
e.printStackTrace()
println(e.getMessage)
}
}
val byteArray: Array[Byte] = value.getBytes(StandardCharsets.UTF_8)
val md5Bytes: Array[Byte] = md5.digest(byteArray)
var hexValue: String = ""
for ( byte <- md5Bytes) {
val str: Int = byte.toInt & 0xff
if (str < 16) {
hexValue=hexValue+"0"
}
hexValue=hexValue+Integer.toHexString(str)
}
hexValue.substring(0, 10)
}

//获取timestamp
def getTimestamp(customTimestampFlag: Boolean, dt: String): Long = {
if(customTimestampFlag) {
val fm = new SimpleDateFormat(DATEFORMAT)
var dateTime = fm.parse(dt + DATESUFFIX)
dateTime.getTime
} else {
0L
}
}

//获取cellTTL todo 非周末周至今、非月末月至今、近7天、近30天此类数据才会加上cellTTL
def getCellTimeToLive(cellTimeToLiveFlag: Boolean, cellTimeToLive: Long): Array[Tag] = {
if(cellTimeToLiveFlag) {
Array[Tag](new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(cellTimeToLive)))
} else {
null
}
}

//获取分时类型,根据tp字段中的聚合周期类型值判断,目前支持的累计类型有:十分累计BY_TEN_MIN_ACCU,小时累计BY_HOUR_ACCU
def getTimeSharingType(aggType: String): String = {
val AccuAggTypeSet = mutable.HashSet("BY_TEN_MIN_ACCU","BY_HOUR_ACCU")
if(AccuAggTypeSet.contains(aggType)) {
"1"
} else {
"0"
}
}

//获取标准聚合周期时间粒度,将BY_TEN_MIN_ACCU、BY_HOUR_ACCU转化为BY_TEN_MIN、BY_HOUR
def getStandardTimeInterval(aggType: String): String = {
if ("BY_TEN_MIN_ACCU".equals(aggType)) {
"BY_TEN_MIN"
} else if ("BY_HOUR_ACCU".equals(aggType)) {
"BY_HOUR"
} else {
aggType
}
}
}