HBase_Spark_Connector开源代码适配自定义timestamp和cellTTL功能

HBase_Spark_Connector开源代码适配自定义timestamp和cellTTL功能

1.背景

之前使用MapReduce实现从Hive表的HDFS源文件中读取行数据,进行转化和写入HBase,使用这种方法时直接使用HFileOutputFormat2.configureIncrementalLoad()方法即可使用HBase封装好的排序Reducer。但是使用这种MapReduce有一个缺点,必须直接读取Hive表源文件,无法做到先进行过滤或聚合计算再进行数据转换和推数,这样在很多情况下会造成hive存储资源的浪费。为了解决这个问题,我寻找开源代码,发现可以通过Spark执行sql,再将SparkSQL执行结果放入DF中,再进行数据转换和推数,hbase-connectors工程就封装实现了该功能,让我们能够像使用MapReduce一样方便快捷地使用Spark进行HBase推数。

但是该开源工程还未实现能够自定义写入HBase数据cell的timestamp和tags参数的功能,为了实现不同业务数据按不同ttl过滤,从而避免存储浪费,现添加一个方法,实现能够自定义timestamp和ttlTag参数并传入生效。

2.代码改造

2.1 添加一个Cell抽象类

该开源工程中定义了一个FamiliesQualifiersValues类,用来标识一个cell的存储和排序方式,对应的创建一个FamiliesQualifiersValuesWithTimeStampAndTimeToLive类,添加timeStamp和tags属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package org.apache.hadoop.hbase.spark

import org.apache.hadoop.hbase.Tag
import java.util
import org.apache.yetus.audience.InterfaceAudience;

/**
* This object is a clean way to store and sort all cells that will be bulk
* loaded into a single row
*/
@InterfaceAudience.Public
class FamiliesQualifiersValuesWithTimeStampAndTimeToLive extends Serializable {
//Tree maps are used because we need the results to
// be sorted when we read them
val familyMap = new util.TreeMap[ByteArrayWrapper,
util.TreeMap[ByteArrayWrapper, (Array[Byte], Long, Array[Tag])]]()

//normally in a row there are more columns then
//column families this wrapper is reused for column
//family look ups
val reusableWrapper = new ByteArrayWrapper(null)

/**
* Adds a new cell to an existing row
* @param family HBase column family
* @param qualifier HBase column qualifier
* @param value HBase cell value
*/
def += (family: Array[Byte], qualifier: Array[Byte], value: Array[Byte], timeStamp: Long, tags: Array[Tag]): Unit = {

reusableWrapper.value = family

var qualifierValues = familyMap.get(reusableWrapper)

if (qualifierValues == null) {
qualifierValues = new util.TreeMap[ByteArrayWrapper, (Array[Byte], Long, Array[Tag])]()
familyMap.put(new ByteArrayWrapper(family), qualifierValues)
}

qualifierValues.put(new ByteArrayWrapper(qualifier), (value, timeStamp, tags))
}

/**
* A wrapper for "+=" method above, can be used by Java
* @param family HBase column family
* @param qualifier HBase column qualifier
* @param value HBase cell value
*/
def add(family: Array[Byte], qualifier: Array[Byte], value: Array[Byte], timeStamp: Long, tags: Array[Tag]): Unit = {
this += (family, qualifier, value, timeStamp, tags)
}
}

2.2 在HBaseRDDFunctions中添加对应bulkload方法

HBaseRDDFunctions中常用的bulkload方法就是hbaseBulkLoad、hbaseBulkLoadThinRows,现对应新增一个hbaseBulkLoadThinRowsWithTimeStampAndTimeToLive方法,替换入参为FamiliesQualifiersValuesWithTimeStampAndTimeToLive:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def hbaseBulkLoadThinRowsWithTimeStampAndTimeToLive(hc: HBaseContext,
tableName: TableName,
mapFunction: (T) =>
(ByteArrayWrapper, FamiliesQualifiersValuesWithTimeStampAndTimeToLive),
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
hc.bulkLoadThinRowsWithTimeStampAndTimeToLive(rdd, tableName,
mapFunction, stagingDir, familyHFileWriteOptionsMap,
compactionExclude, maxSize)
}

2.3 在HBaseContext中添加对应bulkload方法实现

在HBaseContext中编写了bulkLoadThinRows等方法的具体实现逻辑,现对应将hbaseBulkLoadThinRowsWithTimeStampAndTimeToLive方法的实现逻辑也新增在该类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def bulkLoadThinRowsWithTimeStampAndTimeToLive[T](rdd:RDD[T],
tableName: TableName,
mapFunction: (T) =>
(ByteArrayWrapper, FamiliesQualifiersValuesWithTimeStampAndTimeToLive),
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = {
val stagingPath = new Path(stagingDir)
val fs = stagingPath.getFileSystem(config)
if (fs.exists(stagingPath)) {
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
try {
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.ZSTD.getName)
val defaultCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val nowTimeStamp = System.currentTimeMillis()
val tableRawName = tableName.getName

val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]

val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()

while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}

val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)

//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.map( r => mapFunction(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {

val conf = broadcastedConf.value.value
val fs = new Path(stagingDir).getFileSystem(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)

//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (rowKey:ByteArrayWrapper,
familiesQualifiersValuesWithTimeStampAndTimeToLive:FamiliesQualifiersValuesWithTimeStampAndTimeToLive) =>


if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
throw new KeyAlreadyExistsException("The following key was sent to the " +
"HFile load more then one: " + Bytes.toString(previousRow))
}

//The family map is a tree map so the families will be sorted
val familyIt = familiesQualifiersValuesWithTimeStampAndTimeToLive.familyMap.entrySet().iterator()
while (familyIt.hasNext) {
val familyEntry = familyIt.next()

val family = familyEntry.getKey.value

val qualifierIt = familyEntry.getValue.entrySet().iterator()

//The qualifier map is a tree map so the families will be sorted
while (qualifierIt.hasNext) {

val qualifierEntry = qualifierIt.next()
val qualifier = qualifierEntry.getKey
val cellValue = qualifierEntry.getValue._1
val cellTimeStamp = qualifierEntry.getValue._2
val cellTags = qualifierEntry.getValue._3

writeValueToHFileWithTimeStampAndTimeToLive(rowKey.value,
family,
qualifier.value, // qualifier
cellValue, // value
cellTimeStamp,
cellTags,
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
defaultCompression,
writerMap,
stagingDir)

previousRow = rowKey.value
}

writerMap.values.foreach( wl => {
rollOverRequested = rollOverRequested || wl.written > maxSize

//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested) {
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
})
}
}

//This will get a writer for the column family
//If there is no writer for a given column family then
//it will get created here.
//We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
})
} finally {
if(null != conn) conn.close()
}
}

private def writeValueToHFileWithTimeStampAndTimeToLive(rowKey: Array[Byte],
family: Array[Byte],
qualifier: Array[Byte],
cellValue:Array[Byte],
timeStamp: Long,
tags: Array[Tag],
nowTimeStamp: Long,
fs: FileSystem,
conn: Connection,
tableName: TableName,
conf: Configuration,
familyHFileWriteOptionsMapInternal:
util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
hfileCompression:Compression.Algorithm,
writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
stagingDir: String
): WriterLength = {

val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
val familyDir = new Path(stagingDir, Bytes.toString(family))

familyDir.getFileSystem(conf).mkdirs(familyDir);

val loc:HRegionLocation = {
try {
val locator =
conn.getRegionLocator(tableName)
locator.getRegionLocation(rowKey)
} catch {
case e: Throwable =>
logWarning("there's something wrong when locating rowkey: " +
Bytes.toString(rowKey))
null
}
}
if (null == loc) {
if (log.isTraceEnabled) {
logTrace("failed to get region location, so use default writer: " +
Bytes.toString(rowKey))
}
getNewHFileWriter(family = family,
conf = conf,
favoredNodes = null,
fs = fs,
familydir = familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if (log.isDebugEnabled) {
logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
}
val initialIsa =
new InetSocketAddress(loc.getHostname, loc.getPort)
if (initialIsa.isUnresolved) {
if (log.isTraceEnabled) {
logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+ loc.getPort + ", so use default writer")
}
getNewHFileWriter(family,
conf,
null,
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if(log.isDebugEnabled) {
logDebug("use favored nodes writer: " + initialIsa.getHostString)
}
getNewHFileWriter(family,
conf,
Array[InetSocketAddress](initialIsa),
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
}
}
})

var setTimeStamp = nowTimeStamp
if (timeStamp != 0L) {
setTimeStamp = timeStamp
}

val keyValue =new KeyValue(rowKey,
family,
qualifier,
setTimeStamp,cellValue,tags)

wl.writer.append(keyValue)
wl.written += keyValue.getLength

wl
}

2.4 修改默认压缩格式

如果在推数组件中不指定压缩格式,数据推送到HBase中之后数据可用,且会在compact之后按HBase设置的压缩格式重新压缩,但是这样会导致hfile文件load速度特别慢,且HBase集群存储短期膨胀非常严重。如果在推数组件中设置好ZSTD,在进行hfile数据写入时就进行数据压缩,可以大大提高效率,缓解存储压力。如ZSTD压缩格式的文件仅为未压缩文件的十分之一大小。该开源组件中默认是不使用压缩格式,现修改代码为默认使用ZSTD压缩格式,就在bulkLoadThinRowsWithTimeStampAndTimeToLive()方法中:

1
2
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.ZSTD.getName)