HBase_HBaseAPI执行流程与Request&Quota计数

HBase_HBaseAPI执行流程与Request&Quota计数

1.API汇总

1.1客户端的HTable类与服务端的RSRpcServices类

HBase client提供的java api主要在HTable类中:

1
2
3
4
5
6
Result get(Get get);
Result[] get(List<Get> gets);
void put(Put put);
void put(List<Put> puts);
void delete(final Delete delete);
void delete(List<Delete> deletes);

HTable类中操作数据的方法本质都是通过RPC远程调用RegionServer上的数据操作方法。RS上的数据操作方法主要在BlockingInterface接口中定义,在RSRpcServices类中实现:

1
2
3
4
5
6
7
GetResponse get(RpcController var1, GetRequest var2);
MultiResponse multi(RpcController var1, MultiRequest var2);
MutateResponse mutate(RpcController var1, MutateRequest var2);
ScanResponse scan(RpcController var1, ScanRequest var2);
BulkLoadHFileResponse bulkLoadHFile(RpcController var1, BulkLoadHFileRequest var2);
CoprocessorServiceResponse execService(RpcController var1, CoprocessorServiceRequest var2);
CoprocessorServiceResponse execRegionServerService(RpcController var1, CoprocessorServiceRequest var2);

1.2 HTable类的主要属性

1
2
3
4
5
6
7
8
9
10
11
//client通过这个对象连接regionserver
protected ClusterConnection connection;

//线程池,其中一个线程负责连接一个regionserver
private ExecutorService pool;

//异步任务提交器
protected AsyncProcess multiAp;

//rpc工厂类
private RpcRetryingCallerFactory rpcCallerFactory;

1.3 Request计数原理

Request计数是指RSRpcServices类中的requestCount属性,是以regionserver为计数单元的,也就是计算单个regionserver上的request数。

1.4 Quota计数原理

同样通过Quota命令设置的请求量配额也是节点粒度的,是针对单个regionserver上的user/table/namespace,而不是整个表或整个实例。比如说设置某个实例的访问次数quota为10req/s,那就表示存储了该实例所包含的每个regionserver节点,每秒最多接受10个请求。

master在启动时会初始化MasterQuotaManager,负责quota表的管理,根据用户命令在quota表中增删改查配额信息。regionserver在启动时会初始化RegionServerQuotaManager,负责维护一份quota表的缓存,并定期从master中获取quota表信息更新该缓存。

RSRpcServices中被rpc调用的get/multi/mutate/scan等数据操作方法执行前会调用RegionServerQuotaManager的checkQuota方法,根据用户请求解析出user/table/namespace等信息,根据这些信息获取相应的配额信息,并进行计数和判断。

2.put

2.1 put流程源码

源码调用逻辑过多,省略展示。

客户端操作:

  • 把put操作添加到本地writeAsyncBuffer队列里面,符合条件(自动flush或者超过了阀值writeBufferSize)就通过AsyncProcess异步批量提交。
  • 在提交之前,我们要根据每个rowkey找到它们归属的region,这个定位的过程是通过HConnection的locateRegion方法获得的,然后再把这些rowkey按照region分组。用一个Map<ServerName, MultiAction<Row>>类型的对象保存所有put数据,按照regionserver进行分组,要打到不同节点的数据存储在不同的键值对中。而MultiAction<Row>中又使用一个Map<regionName, List<Action<R>>> actions对象存储所有put数据,按照region进行分组。之所以这样分是因为一张表的不同region可能存储在同一个regionserver节点上。
  • 通过多线程并发地为每个不同的regionserver构造一个rpc连接,每个线程异步执行一个MultiServerCallable<Row>任务。任务中的call()方法通过this.getStub().multi()方法rpc远程调服务端的数据操作方法。

服务端操作也就是执行RSRpcServices.multi()方法。

2.2 request&quota计数

  • 通过requestCount.add()方法进行request计数,可以发现multi()方法中每一行put数据request计数就加一。一共有多少行put打到该regionserver上,request计数就加多少。

  • 通过checkQuota()方法进行quota计数,可以发现multi()方法每访问一个region,quota访问次数就加一。打到该regionserver上的批量put一共打到多少个region上,quota访问次数就加多少。如果quota计数超出访问次数配额则抛出异常停止执行。

  • 通过checkQuota()方法进行数据大小预估,批量put操作预估write流量为Put 个数 * 100字节。如果该次quota访问的数据大小超出访问流量配额则抛出异常停止执行。

2.3 put(List)流程

查看源码可以发现put(List)与put流程并没有区别,同样是将put操作一个个添加到本地writeAsyncBuffer队列当中,符合条件就通过AsyncProcess异步批量提交。所以用put(List)来代替put()并不会起到提高写入速度的作用。

3.get

3.1 get流程源码

HTable.get()源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Result get(final Get get, boolean checkExistenceOnly) throws IOException {
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, this.getName(), get.getRow()) {
public Result call(int callTimeout) throws IOException {
GetRequest request = RequestConverter.buildGetRequest(this.getLocation().getRegionInfo().getRegionName(), get);
PayloadCarryingRpcController controller = HTable.this.rpcControllerFactory.newController();
controller.setPriority(this.tableName);
controller.setCallTimeout(callTimeout);

try {
GetResponse response = this.getStub().get(controller, request);
return response == null ? null : ProtobufUtil.toResult(response.getResult());
} catch (ServiceException var5) {
throw ProtobufUtil.getRemoteException(var5);
}
}
};
return (Result)this.rpcCallerFactory.newCaller(this.rpcTimeout).callWithRetries(callable, this.operationTimeout);
}

客户端操作:

直接新建一个callable任务对象,创建一个rpc连接交给服务端执行,通过this.getStub().get()执行数据操作方法。

服务端操作也就是执行RSRpcServices.get()方法。

3.2 request&quota计数

  • 通过requestCount.increment()方法加一。

  • 通过checkQuota()方法进行quota访问次数加一。如果quota计数超出访问次数配额则抛出异常停止执行。

  • 通过checkQuota()方法数据大小预估,单个get操作预估read流量为100字节。如果该次quota访问的数据大小超出访问流量配额则抛出异常停止执行。

3.3 get(List)流程

查看源码可以发现get(List)与put流程惊人相似,只不过没有本地缓存队列。

get(List)也是在客户端按照不同regionserver和region分好组,然后使用AsyncProcess.sendMultiAction() 方法进行rpc调用。为每个regionserver创建一个rpc连接,通过this.getStub().multi()执行数据操作。

所以get(List)的request&quota计数逻辑与put相同。

4.delete

4.1 delete流程源码

HTable.delete()源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void delete(final Delete delete) throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, this.tableName, delete.getRow()) {
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = HTable.this.rpcControllerFactory.newController();
controller.setPriority(this.tableName);
controller.setCallTimeout(callTimeout);

try {
MutateRequest request = RequestConverter.buildMutateRequest(this.getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = this.getStub().mutate(controller, request);
return response.getProcessed();
} catch (ServiceException var5) {
throw ProtobufUtil.getRemoteException(var5);
}
}
};
this.rpcCallerFactory.newCaller(this.rpcTimeout).callWithRetries(callable, this.operationTimeout);
}

客户端操作:

直接新建一个callable任务对象,创建一个rpc连接交给服务端执行,通过this.getStub().mutate()执行数据操作方法。

服务端操作也就是执行RSRpcServices.mutate()方法。

4.2 request&quota计数

  • 通过requestCount.increment()方法加一。

  • 通过checkQuota()方法进行quota访问次数加一。如果quota计数超出访问次数配额则抛出异常停止执行。

  • 通过checkQuota()方法数据大小预估,单个delete操作预估write流量为100字节。如果该次quota访问的数据大小超出访问流量配额则抛出异常停止执行。

4.3 delete(List)流程

查看源码可以发现delete(List)与get(List)流程流程一模一样,所以request&quota计数逻辑也一模一样。

5.scan的request&quota计数

scan流程源码不在赘述,相关源码解析随处可见。

scan会按照setCaching的大小进行分步获取,每当读取完上次获取的数据则通过next创建一个rpc调用服务端的RSRpcServices.scan()方法。

则每次服务端的scan()方法中:

  • 通过requestCount.increment()方法加一。
  • 通过checkQuota()方法进行quota访问次数加一。如果quota计数超出访问次数配额则抛出异常停止执行。
  • 通过checkQuota()方法数据大小预估,单个next操作预估read流量为1000字节。如果该次quota访问的数据大小超出访问流量配额则抛出异常停止执行。