HBase_Java离线脚本或单机部署服务中使用Queue队列实现限流和削峰实战

HBase_Java离线脚本或单机部署服务中使用Queue队列实现限流和削峰实战

1.Queue读写缓冲区

在HBase推数脚本中,当用到多线程并发写入来提升写入速度时,涉及到多线程交流。一个实例只创建一个数据库连接,则需要将多个线程中的数据合并到一起写入到数据库中,涉及到线程通信,为了避免数据不一致问题,创建一个Queue来承接所有线程写入。

另外还可以配合AtomicLong属性,控制Queue队列最大长度,达到最大长度后阻塞写入线程,可以用来防止数据流入速度大于数据流出,造成jvm内存过大,实现削峰效果。

实战Writer如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
* HBase数据导入类,具体功能如下:
* 实现 ITableWriter接口,收到rowData对象时将其put入Hbase 可配置成串行或并行多线程两种方式运行
*/
public class HBaseTableWriter extends Thread implements ITableWriter {
/** HBase表操作对象 */
private BufferedMutator table;

/** 数据处理完,是否停止线程 */
private boolean dieWhileDataOver;
/** 数据缓冲区 */
private Queue<Put> queue = new ConcurrentLinkedQueue<Put>();

/** hbase表导入条数 */
private int hbaseWriteRowNum = 0;
/** hbase表名称 */
private String tableName;
/** 是否启用线程 */
private boolean enableThread;

/** 缓存堆大小上限 */
private AtomicLong heapLimitSize = new AtomicLong(0);// 524288000;
// //500m
/** 数据大小统计 */
private AtomicLong putSize = new AtomicLong(0);

DataPickerConfiguration dpConf;


public HBaseTableWriter(String tableName) {
this.tableName = tableName;
this.dpConf = DataPickerConfiguration.getConfiguration();
}

/**
* 初始化各种类变量,如果必要启动线程
*/
@Override
public void init() throws IOException {
DataClient dataClient = ClientFactory.getDataClient(dpConf.getInstanceName(), dpConf.getAccessKey(), dpConf.getNamespace());
table = dataClient.getMutator(getTableName(), dpConf.getHbaseWriteBufferSize());
enableThread = DataPickerConfiguration.getConfiguration().isEnableMultiThread();
if (enableThread) {
this.start();
}
hbaseWriteRowNum = 0;
dieWhileDataOver = false;
queue.clear();
}

/**
* 往Hbase写入行数据,如果多线程模式,则写入线程队列。否则直接操作HTABLE
*/
@Override
public void writeRow(HBaseRowData rowData) throws IOException {
hbaseWriteRowNum++;
Put put = rowData.toPut();
//put.setWriteToWAL(false);
if (enableThread) {
addPut(put);
} else
getHTable().mutate(put);
}

@Override
public String getTableName() {
return tableName;
}

@Override
public void close() throws IOException, InterruptedException {
if (enableThread) {
this.setDieWhileDataOver(true);
this.join();
}
//table.flushCommits();
table.close();
//TODO:table.close();
//HBaseTabler tabler = new HBaseTabler(context);
//tabler.flushTable(this.getTableName());
}

/**
* 将put加入输出队列
* 如果队列个数大于指定个数(500),则等待。小于时,则加入队列
* 解决在处理大数据表时,内存无限占用的问题(输出速度跟不上输入速度,造成队列太大)
*
* @param put
*/
public void addPut(Put put) {
while (true) {
if (heapLimitSize.get() < 104857600) {// 352,986,752 100M
if (!queue.offer(put)) {
PickerLog.logError(Bytes.toStringBinary(put.getRow()) + " addPut error occurs.");
}
heapLimitSize.addAndGet(put.heapSize());
break;
} else
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

public void setDieWhileDataOver(boolean dieWhileDataOver) {
this.dieWhileDataOver = dieWhileDataOver;
}

/**
* 线程执行体, 一直取队列数据往HTable里面写入
*/
public void run() {
List<Throwable> exceptions = new ArrayList<Throwable>();
int rowNum = 0;
int rowFailed = 0;
while (!Thread.currentThread().isInterrupted()) {
Put put = queue.poll();
if (put != null) {
try {
this.getHTable().mutate(put);
rowNum++;
heapLimitSize.addAndGet(-put.heapSize());
putSize.addAndGet(put.heapSize());
} catch (Exception e) {
rowFailed++;
if(rowFailed < 100)
exceptions.add(e);
}
} else {
if (dieWhileDataOver) {
break;
}
}
}
PickerMonitor.setDestTotal(hbaseWriteRowNum);
PickerMonitor.setDestSucc(rowNum);
PickerMonitor.setPutSize(putSize.get());
if (exceptions.size() > 0){
MultiException exp = new MultiException(exceptions, hbaseWriteRowNum - rowNum, hbaseWriteRowNum);
PickerLog.logError("error occurs while put table data: ", exp);
}
}

public BufferedMutator getHTable() {
return table;
}
}

2.令牌桶算法实现限流

以下是一个简化的使用队列实现限流的示例,这个例子大致模拟了令牌桶算法的逻辑:1. 创建一个队列来作为令牌桶;2. 有一个后台线程或定时任务以固定的速率向队列中添加”令牌”,本例中以时间戳作为令牌,直到达到最大容量MAX_TOKENS;3. 当请求到达时,尝试从队列中获取一个令牌;4. 如果能够获取到令牌,则允许请求继续执行;5. 如果无法获取到令牌,即队列为空,则拒绝或排队请求,实现限流。

那么通过调整后台任务的添加速率以及令牌桶的容量,就可以对请求频率进行精确的控制。实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RateLimiter {
private final Queue<Long> tokenBucket;
private final int MAX_TOKENS;
private final long tokenAddingRate; // in milliseconds

public RateLimiter(int maxTokens, long tokenAddingRate) {
this.MAX_TOKENS = maxTokens;
this.tokenBucket = new LinkedList<>();
this.tokenAddingRate = tokenAddingRate;
startTokenAddingTask();
}

private void startTokenAddingTask() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
synchronized (tokenBucket) {
if (tokenBucket.size() < MAX_TOKENS) {
tokenBucket.add(System.currentTimeMillis());
}
}
}, 0, tokenAddingRate, TimeUnit.MILLISECONDS);
}

public boolean tryAcquire() {
synchronized (tokenBucket) {
if (!tokenBucket.isEmpty()) {
tokenBucket.remove();
return true;
}
return false;
}
}

public static void main(String[] args) {
RateLimiter rateLimiter = new RateLimiter(5, 1000);

// Emulating 10 requests
for (int i = 0; i < 10; i++) {
System.out.println("Request " + i + ": " + (rateLimiter.tryAcquire() ? "allowed" : "limited"));
try {
Thread.sleep(200); // Adjust the sleep to emulate different request intervals
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}