HBase_HBase自带压测工具PerformanceEvaluation改造与应用实战

HBase_HBase自带压测工具PerformanceEvaluation改造与应用实战

1.背景

由于数据产品业务需求增长迅速,各种各样的看数需求和面板被提出,同时低频使用的页面关停并转又不能被各级领导写到汇报ppt中去,那么存量加增量数据看版所需要的HBase存储资源自然就迅速增长。但是在降本增效和技术升级的大背景下,平台也不再对HBase物理集群进行扩容支持,并不断推动物理集群的迁移和下线,推动使用存算分离的HBase容器集群,支持更加快捷的动态扩缩容和灵活的计算存储资源比例。但是部门领导对于存算分离HBase集群的性能存疑,但是想要使用业务请求数据进行压测需要相同的集群规模和等量业务历史数据,数据回溯耗时较长且直接申请与线上集群相同规划的大集群不太现实。那么就需要寻找一种不依赖业务数据的测试方式,能够测试出不同规格或者不同实现架构HBase集群的性能差异,这时我查询资料发现了HBase开源代码中自带了一个压测工具PerformanceEvaluation。

2.PerformanceEvaluation原理

在HBase中,自带了一个benchmark工具PerformanceEvaluation,可以非常方便地对HBase的Put、Get、Scan等API进行性能测试,并提供了非常丰富的参数来模拟各种场景。PerformanceEvaluation的全名是org.apache.hadoop.hbase.PerformanceEvaluation,已经集成在了bin/hbase工具集中,在安装好HBase的机器上,在HBase的安装路径的bin目录下执行hbase pe,加上相应参数,即可运行PE工具。执行命令格式和相关参数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
[root@xxxxxx ~]# hbase pe
Usage: java org.apache.hadoop.hbase.PerformanceEvaluation \
<OPTIONS> [-D<property=value>]* <command> <nclients>

General Options:
nomapred 采用MapReduce的方式启动多线程测试还是通过多线程的方式,如果没有安装MapReduce,或者不想用MapReduce,通常我们采用多线程的方式,因此一般在命令中加上--nomapred来表示不使用MapReduce。
rows 每个客户端(线程)运行的行。默认值:一百万。注意这里的行数是指单线程的行数,如果rows=100, 线程数为10,那么在写测试中,写入HBase的将是 100 x 10 行
size 总大小,单位GiB。与--rows互斥。默认值:1.0。
sampleRate 样本比例:对总行数的一部分样本执行测试。只有randomRead支持。默认值:1.0
traceRate 启用HTrace跨度。每N行启动一次跟踪。默认值:0
table 测试表的名字,如果不设,默认为TestTable。
multiGet 如果> 0,则在执行RandomRead时,执行多次获取而不是单次获取。默认值:0
compress 要使用的压缩类型(GZ,LZO,...)。默认值:'无'
flushCommits 该参数用于确定测试是否应该刷新表。默认值:false
writeToWAL 在puts上设置writeToWAL。默认值:True
autoFlush 默认为false,即PE默认用的是BufferedMutator,BufferedMutator会把数据攒在内存里,达到一定的大小再向服务器发送,如果想明确测单行Put的写入性能,建议设置为true。个人觉得PE中引入autoFlush会影响统计的准确性,因为在没有攒够足够的数据时,put操作会立马返回,根本没产生RPC,但是相应的时间和次数也会被统计在最终结果里。
oneCon 多线程运行测试时,底层使用一个还是多个链接。这个参数默认值为false,每个thread都会启一个Connection,建议把这个参数设为True
presplit 表的预分裂region个数,在做性能测试时一定要设置region个数,不然所有的读写会落在一个region上,严重影响性能
inmemory 试图尽可能保持CF内存的HFile。不保证始终从内存中提供读取。默认值:false
usetags 与KV一起写标签。与HFile V3配合使用。默认值:false
numoftags 指定所需的标签号。仅当usetags为true时才有效。
filterAll 通过不将任何内容返回给客户端,帮助过滤掉服务器端的所有行。通过在内部使用FilterAllFilter,帮助检查服务器端性能。
latency 设置为报告操作延迟。默认值:False
bloomFilter Bloom 过滤器类型,[NONE,ROW,ROWCOL]之一
valueSize 写入HBase的value的size,单位是Byte,大家可以根据自己实际的场景设置这个Value的大小。默认值:1024
valueRandom 设置是否应该在0和'valueSize'之间改变值大小;设置读取大小的统计信息:默认值: Not set.
valueZipf 设置是否应该以zipf格式改变0和'valueSize'之间的值大小, 默认值: Not set.
period 报告每个'period'行:默认值:opts.perClientRunRows / 10
multiGet 批处理组合成N组。只有randomRead支持。默认值: disabled
replicas 启用区域副本测试。默认值:1。
splitPolicy 为表指定自定义RegionSplitPolicy。
randomSleep 在每次获得0和输入值之前进行随机睡眠。默认值:0

Note: -D properties will be applied to the conf used.
For example:
-Dmapreduce.output.fileoutputformat.compress=true
-Dmapreduce.task.timeout=60000

Command:
filterScan 使用过滤器运行扫描测试,根据它的值查找特定行(确保使用--rows = 20)
randomRead 运行随机读取测试
randomSeekScan 运行随机搜索和扫描100测试
randomWrite 运行随机写测试
scan 运行扫描测试(每行读取)
scanRange10 使用开始和停止行(最多10行)运行随机搜索扫描
scanRange100 使用开始和停止行运行随机搜索扫描(最多100行)
scanRange1000 使用开始和停止行(最多1000行)运行随机搜索扫描
scanRange10000 使用开始和停止行运行随机搜索扫描(最多10000行)
sequentialRead 运行顺序读取测试
sequentialWrite 运行顺序写入测试

Args:
nclients 整数。必须要有该参数。客户端总数(和HRegionServers)
running: 1 <= value <= 500
Examples:
运行一个单独的客户端:
$ bin/hbase org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 1

其中几个比较重要的全局参数如下:

  • nomapred:采用MapReduce的方式启动多线程测试还是通过多线程的方式,如果没有安装MapReduce,或者不想用MapReduce,通常我们采用多线程的方式,因此一般在命令中加上–nomapred来表示不使用MapReduce。
  • oneCon:多线程运行测试时,底层使用一个还是多个连接。这个参数默认值为false,每个thread都会启一个Connection,建议把这个参数设为True,至于原因,后面的章节会讲。
  • valueSize:写入HBase的value的size,单位是Byte,大家可以根据自己实际的场景设置这个Value的大小。
  • blockEncoding:PE工具会自动建表,这个参数用来指定表的block encoding。关于encoding后面会有专门的文章介绍,这里不再讲。
  • table:测试表的名字,如果不设,默认为TestTable。
  • rows:总共测试的行数。注意这里的行数是指单线程的行数,如果rows=100, 线程数为10,那么在写测试中,写入HBase的将是 100 x 10 行。
  • size:总测试的数据大小,单位为GB,这个参数与上面的size是互斥的,不要两个参数一起设。在使用randomReads和randomSeekScans测试时,这个size可以用来指定读取的数据范围。这个值在Read时非常重要,如果设的不好,会产生很多返回值为空的读,影响测试结果,下面会详细介绍。
  • compress:设置表的compress算法,根据自己选择,默认是None,即不做压缩。
  • presplit:表的预分裂region个数,在做性能测试时一定要设置region个数,不然所有的读写会落在一个region上,严重影响性能
  • autoFlush:默认为false,即PE默认用的是BufferedMutator,BufferedMutator会把数据攒在内存里,达到一定的大小再向服务器发送,如果想明确测单行Put的写入性能,建议设置为true。个人觉得PE中引入autoFlush会影响统计的准确性,因为在没有攒够足够的数据时,put操作会立马返回,根本没产生RPC,但是相应的时间和次数也会被统计在最终结果里。

3.PerformanceEvaluation源码改写与实战

我们部门是HBase集群的使用方,只能使用平台封装好的基本api进行调用,并没有到集群节点上直接执行CMD命令的权限;并且原版PerformanceEvaluation参数和功能较复杂,我们这里只是想看看HBase容器集群的极限读写QPS能不能满足要求,与物理集群有多大区别,我并不想去研究这么多配置参数并全部使用。

那么既然它是官方支持的封装工具,而HBase本身也是开源的,那问题就能好解了,直接扒工具源码,自己大概复刻一个简易版本就行了。这里也体现处理平时遇到问题或者疑问多看开源工程源码的好处了,有这种熟悉度和自信,之前其实我还是挺常看源码的,尤其是HBase源码,还进行过一些改造和使用,遇到疑问或者想参考解决方案看开源项目源码也是非常好的。就是最近业务需求比较紧也比较繁杂,搞得没太多时间和沉得下来的心思来看,这一点确实要反思一下自己。

看了一下源码,其实这个PerformanceEvaluation工具本质就是自己伪造一些数据,然后开多线程按目标qps去请求集群就好了,如从0-10000的顺序rowkey写入、0-10000的随机rowkey写入、先从0-10000的顺序rowkey写入再顺序读出、先从0-10000的顺序rowkey写入再随机读出等等。

1.测试参数初始化类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class TestOptions {
static final String RANDOM_SEEK_SCAN = "randomSeekScan";
static final String RANDOM_READ = "randomRead";
static final String PE_COMMAND_SHORTNAME = "pe";
private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluationJava.class.getName());
private static final Gson GSON = GsonUtil.createGson().create();

public static final String TABLE_NAME = "TestTable";
public static final String FAMILY_NAME_BASE = "info";
public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
public static final int DEFAULT_VALUE_LENGTH = 500;
public static final int ROW_LENGTH = 26;

private static final int ONE_GB = 1000 * 10 * 500;
public static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
private static final int TAG_LENGTH = 256;
private static final DecimalFormat FMT = new DecimalFormat("0.##");
private static final MathContext CXT = MathContext.DECIMAL64;
private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);

int startRow = 0;
float size = 1.0f;
int perClientRunRows = DEFAULT_ROWS_PER_GB;
int totalRows = DEFAULT_ROWS_PER_GB;
int period = (this.perClientRunRows / 100) == 0? perClientRunRows: perClientRunRows / 100;
int valueSize = DEFAULT_VALUE_LENGTH;
long bufferSize = 10L;
String tableName;

public TestOptions(String tableName, int threadNum) {
this.tableName = tableName;
this.totalRows = threadNum * DEFAULT_ROWS_PER_GB;
}
}

2.指标统计工具类,后续会有一篇博文专门记录java中的指标统计工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@InterfaceAudience.Private
public final class HistogramUtils {
private HistogramUtils() {}
private static DecimalFormat DOUBLE_FORMAT = new DecimalFormat("#0.00");

public static Histogram newHistogram(Reservoir sample) {
try {
Constructor<?> ctor =
Histogram.class.getDeclaredConstructor(Reservoir.class);
ctor.setAccessible(true);
return (Histogram) ctor.newInstance(sample);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static String getShortHistogramReport(final Histogram hist) {
Snapshot sn = hist.getSnapshot();
return "mean=" + DOUBLE_FORMAT.format(sn.getMean()) +
", min=" + DOUBLE_FORMAT.format(sn.getMin()) +
", max=" + DOUBLE_FORMAT.format(sn.getMax()) +
", stdDev=" + DOUBLE_FORMAT.format(sn.getStdDev()) +
", 95th=" + DOUBLE_FORMAT.format(sn.get95thPercentile()) +
", 99th=" + DOUBLE_FORMAT.format(sn.get99thPercentile());
}
}

3.计算结果类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RunResult implements Comparable<RunResult> {
public RunResult(long duration, Histogram hist) {
this.duration = duration;
this.hist = hist;
}

public final long duration;
public final Histogram hist;

@Override
public String toString() {
return Long.toString(duration);
}

@Override public int compareTo(RunResult o) {
return Long.compare(this.duration, o.duration);
}
}

4.测试实例抽象类,主要封装了一些测试实例的公共方法,如触发开始、记录状态、计时等功能,上述提到的如从0-10000的顺序rowkey写入、0-10000的随机rowkey写入等就是测试实例的实现类型,都需要实现该抽象类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public abstract class TestBase {
public static final int ROW_LENGTH = 26;
public static final String FAMILY_NAME_BASE = "d";
public static final String COLUMN_ZERO = "d0";

protected final TestOptions opts;
private final Status status;
protected Connection connection;
protected Table table;
protected Histogram latencyHistogram;
protected Histogram valueSizeHistogram;
private static final Random randomSeed = new Random(System.currentTimeMillis());
private static long nextRandomSeed() {
return randomSeed.nextLong();
}
protected final Random rand = new Random(nextRandomSeed());

public TestBase(final Connection con, final TestOptions options, final Status status) throws IOException {
this.connection = con;
this.opts = options;
this.status = status;
this.table = connection.getTable(TableName.valueOf(opts.tableName));
}

void testSetup() throws IOException {
// test metrics
latencyHistogram = HistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
valueSizeHistogram = HistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
}

int getStartRow() {
return opts.startRow;
}

int getLastRow() {
return getStartRow() + opts.perClientRunRows;
}

String generateStatus(final int sr, final int i, final int lr) {
return sr + "/" + i + "/" + lr + ", latency " +
HistogramUtils.getShortHistogramReport(this.latencyHistogram) + ", value size " +
HistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
}

void testTimed() throws IOException, InterruptedException {
...
}

abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException;
}

5.测试实例实现类,如下分别是随机写、顺序写、顺序读的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class RandomdReadTest extends TestBase {
public RandomdReadTest(Connection con, TestOptions options, Status status) throws IOException {
super(con, options, status);
}

@Override
boolean testRow(int i, long startTime) throws IOException, InterruptedException {
Get get = new Get(getRandomRow(this.rand, opts.totalRows));

byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE);
byte [] qualifier = Bytes.toBytes(COLUMN_ZERO);
get.addColumn(familyName, qualifier);
table.get(get);
return true;
}

protected byte[] generateRow(final int i) {
return format(i);
}

public static byte [] format(final int number) {
byte [] b = new byte[ROW_LENGTH];
int d = Math.abs(number);
for (int i = b.length - 1; i >= 0; i--) {
b[i] = (byte)((d % 10) + '0');
d /= 10;
}
return b;
}

static byte [] getRandomRow(final Random random, final int totalRows) {
return format(generateRandomRow(random, totalRows));
}

static int generateRandomRow(final Random random, final int totalRows) {
return random.nextInt(Integer.MAX_VALUE) % totalRows;
}
}


public class SequentialReadTest extends TestBase {
public SequentialReadTest(Connection con, TestOptions options, Status status) throws IOException {
super(con, options, status);
}

@Override
boolean testRow(int i, long startTime) throws IOException, InterruptedException {
byte[] row = generateRow(i);
Get get = new Get(row);

byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE);
byte [] qualifier = Bytes.toBytes(COLUMN_ZERO);
get.addColumn(familyName, qualifier);
table.get(get);
return true;
}

protected byte[] generateRow(final int i) {
return format(i);
}

public static byte [] format(final int number) {
byte [] b = new byte[ROW_LENGTH];
int d = Math.abs(number);
for (int i = b.length - 1; i >= 0; i--) {
b[i] = (byte)((d % 10) + '0');
d /= 10;
}
return b;
}
}


public class SequentialWriteTest extends TestBase {
public SequentialWriteTest(Connection con, TestOptions options, Status status) throws IOException {
super(con, options, status);
}

@Override
boolean testRow(int i, long startTime) throws IOException, InterruptedException {
byte[] row = generateRow(i);
Put put = new Put(row);

byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE);
byte [] qualifier = Bytes.toBytes(COLUMN_ZERO);
byte[] value = generateData(this.rand, opts.valueSize);
put.addColumn(familyName, qualifier, value);
//this.valueSizeHistogram.update(value.length);

put.setDurability(Durability.SYNC_WAL);
table.put(put);
return true;
}

protected byte[] generateRow(final int i) {
return format(i);
}

public static byte [] format(final int number) {
byte [] b = new byte[ROW_LENGTH];
int d = Math.abs(number);
for (int i = b.length - 1; i >= 0; i--) {
b[i] = (byte)((d % 10) + '0');
d /= 10;
}
return b;
}

/**
* @return Generated random value to insert into a table cell.
*/
public static byte[] generateData(final Random r, int length) {
byte [] b = new byte [length];
int i;

for(i = 0; i < (length-8); i += 8) {
b[i] = (byte) (65 + r.nextInt(26));
b[i+1] = b[i];
b[i+2] = b[i];
b[i+3] = b[i];
b[i+4] = b[i];
b[i+5] = b[i];
b[i+6] = b[i];
b[i+7] = b[i];
}

byte a = (byte) (65 + r.nextInt(26));
for(; i < length; i++) {
b[i] = a;
}
return b;
}
}

6.最后是启动类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class PerformanceEvaluationJava {
private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluationJava.class.getName());

public static void main(String[] args) throws IOException {
int threadNum = 20;

HBaseConfiguration config = new HBaseConfiguration();
config.set("bdp.hbase.accesskey", "NA4CH6B7RFYAJLV3ZMMAHDFGO4");
config.set("bdp.hbase.instance.name", "CL1000000005186");
//config.set("fs.defaultFS", inputRootPath);
//config.setLong("hbase.rpc.timeout", 3600000);
config.setBoolean("hbase.policy.http.client.v2.enabled", false);

@SuppressWarnings("unchecked")
Future<RunResult>[] threads = new Future[threadNum];
RunResult[] results = new RunResult[threadNum];
ExecutorService pool = Executors.newFixedThreadPool(threadNum,
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
final Connection[] cons = new Connection[threadNum];
for (int i = 0; i < threadNum; i++) {
cons[i] = ConnectionFactory.createConnection(config);
}
LOG.info("Created " + threadNum + " connections for " + threadNum + " threads");

for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = pool.submit(new Callable<RunResult>() {
@Override
public RunResult call() throws Exception {
TestOptions threadOpts = new TestOptions("sz_ela:HB_IBD_SHOP_VIEW_INSHOP_NEW", threadNum);
final Connection con = cons[index % cons.length];
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
TestBase t = new SequentialWriteTest(con, threadOpts, new Status() {
@Override
public void setStatus(final String msg) throws IOException {
LOG.info(msg);
}
});
t.testSetup();
final long startTime = System.nanoTime();
t.testTimed();
t.testTakedown();
long totalElapsedTime = (System.nanoTime() - startTime) / 1000000;
LOG.info("Finished " + Thread.currentThread().getName() + " in " + totalElapsedTime + "ms at offset "
+ threadOpts.startRow + " for " + threadOpts.perClientRunRows + " rows");
return new RunResult(totalElapsedTime, t.latencyHistogram);
}
});
}

pool.shutdown();

for (int i = 0; i < threads.length; i++) {
try {
results[i] = threads[i].get();
} catch (ExecutionException | InterruptedException e) {
throw new IOException(e.getCause());
}
}
LOG.info("[test] Summary of timings (ms): "
+ Arrays.toString(results));
Arrays.sort(results);
long total = 0;
float avgLatency = 0 ;
float avgTPS = 0;
for (RunResult result : results) {
total += result.duration;
avgLatency += result.hist.getSnapshot().getMean();
avgTPS += TestOptions.DEFAULT_ROWS_PER_GB * 1.0f / result.duration;
}
avgTPS *= 1000; // ms to second
avgLatency = avgLatency / results.length;
LOG.info("[test] duration " + "\tMin: " + results[0] + "ms" + "\tMax: " + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");

for (int i = 0; i < threadNum; i++) {
cons[i].close();
}
}
}

4.测试结果

测试场景 单线程数据量 并发线程数 min(ms) max(ms) tp95(ms) tp99(ms) avg(ms) TPS
容器集群顺序写 10000 40 1.00 1207.00 1.00 101.00 2 16758
物理集群顺序写 10000 40 1.00 664.00 2.00 5.00 2 17311
容器集群随机读 10000 20 1.00 959.00 1.00 1.00 1 16535
物理集群随机读 10000 20 1.00 559.00 1.00 2.00 1 16720

在配额放开到一致的情况下,存算分离容器集群与物理集群读写性能相近,均能达到17000QPS。而且QPS达到峰值之后,CPU也远没有达到性能瓶颈,可见限制HBase集群QPS的并不是CPU计算能力。

物理集群40并发顺序写场景下cpu使用率如下,从日常34%升高至48%:

容器集群40并发顺序写场景下cpu使用率如下,从日常0%升高至6%:

参考文献

HBase——PerformanceEvaluation(压测工具)