HBase_gRPC&protobuf&HBase基于protobuf创建endpoint协处理器实战

HBase_gRPC&protobuf&HBase基于protobuf创建endpoint协处理器实战

1.RPC

1.1 RPC简介

RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议。它允许像调用本地服务一样调用远程服务。另外RPC是与语言无关的。

rpc框架做的最重要的一件事情就是封装,调用者和被调用者的通讯细节,客户端代理负责将调用方法的方法名、参数、返回值包等信息根据通信协议组织成报文发送给服务端,服务端解析报文,根据客户端传递的信息执行对应的方法,然后将返回值按照协议组织成报文发送给客户端,客户端再解析出来。

1.2 RPC框架的实现

RPC能够让本地应用简单、高效地调用服务器中的方法,它主要应用在分布式系统,要实现一个RPC框架,主要需要考虑以下四个技术的实现:

1)通信模型:假设通信的为A机器与B机器,它们之间要有通信模型,在Java中一般基于BIO或NIO;

2)服务定位:使用给定的通信方式与确定的IP和端口及方法名称,路由到具体的服务和方法;

3)远程代理对象:本地调用的方法(服务)其实是远程方法的本地代理,因此需要一个远程代理对象,对于Java而言,远程代理对象可以使用Java的动态对象实现,封装了调用远程方法调用;

4)序列化:将对象名称、方法名称、参数等对象信息进行网络传输需要转换成二进制传输,需要序列化技术方案,如protobuf,Arvo等。

1.3 Socket通信模型

Socket(套接字)用来描述IP地址和端口,是通信链的句柄,应用程序可以通过Socket向网络发送请求或者应答网络请求。Socket是支持TCP/IP协议的网络通信基本操作但愿,包含了进行网络通信所必须的五种信息:连接协议、本地主机IP、本地主机端口、远程主机IP、远程主机端口。

首先回顾下计算机网络的五(七)层协议:物理层、数据链路层、网络层、传输层、(会话层、表示层)和应用层。那么从协议上来讲,TCP是传输层协议,主要解决数据如何在网络中传输;HTTP是应用层协议,主要解决如何包装数据(文本信息),是建立在tcp协议之上的应用。

TCP协议是以二进制数据流的形式解决传输层的事儿,但对上层的应用开发极不友好。Socket是针对TCP或UDP的具体接口实现,提供了在传输层进行网络编程的方法。而HTTP是一种特定的应用层协议,它使用TCP作为传输层协议,通过Socket来实现数据的传输,可以说Socket是HTTP的基础,而HTTP则是在Socket之上的一种具体应用。

所以RPC跟HTTP不是对立面,也不是同等级,RPC和HTTP的关系只可能是包含关系。RPC可以使用HTTP作为通讯协议,也可以使用其他传输协议。

1.4 gRPC与Dubbo

gRPC与Dubbo都是基于RPC思想实现的远程过程调用框架,两者在实现上的差异如下:

1)通讯协议:gRPC基于Http2.0;Dubbo基于定义TCP。

2)序列化协议:gRPC使用protocol buffer;Dubbo使用hession2等基于java对象的序列化技术,且它的序列化方式可以自行扩展。

3)服务注册与发现:gRPC是应用级别的服务注册;Dubbo2.0及之前的版本都是基于更细力度的服务来进行注册,3.0之后转向应用级别的服务注册。

4)编程语言:gRPC可以使用任何语言编写,Http和Protocol buffer天然就是跨语言的;Dubbo只能使用在构建在JVM之上的语言。

5)服务治理:gRPC自身的治理能力很弱,只能基于Http连接维度进行容错;Dubbo可以基于服务维度进行治理。

两者各有优缺点,gRPC的优势在于跨语言、跨平台,但服务治理能力弱;Dubbo服务治理能力强,但是受编程语言限制无法跨编程语言使用。

1.5 用Java实现简单RPC

下面使用比较原始的方案实现RPC框架,采用基于BIO的Socket实现通信与服务定位、采用java动态代理与反射实现远程代理,采用Java原生序列化实现序列化。RPC服务架构主要分为三个部分:

1)服务提供者:运行在服务器端,提供服务接口定义与服务实现类。

2)服务中心:运行在服务器端,负责将本地服务发布成远程服务提供给服务消费者使用,监听客户端调用。

3)服务消费者:运行在客户端,通过远程代理对象调用远程服务。

1.服务提供者接口定义与实现:

1
2
3
4
5
6
7
8
9
public interface HelloService { 
String sayHi(String name);
}

public class HelloServiceImpl implements HelloService {
public String sayHi(String name) {
return "Hi, " + name;
}
}

2.服务中心接口定义与实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public interface Server {
public void stop();
public void start() throws IOException;
public void register(Class serviceInterface, Class impl);
public boolean isRunning();
public int getPort();
}


public class ServiceCenter implements Server {
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();
private static boolean isRunning = false;
private static int port;

public ServiceCenter(int port) {
this.port = port;
}

public void stop() {
isRunning = false;
executor.shutdown();
}

public void start() throws IOException {
ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress(port));
System.out.println("start server");
try {
while (true) {
// 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
executor.execute(new ServiceTask(server.accept()));
}
} finally {
server.close();
}
}

public void register(Class serviceInterface, Class impl) {
serviceRegistry.put(serviceInterface.getName(), impl);
}

public boolean isRunning() {
return isRunning;
}

public int getPort() {
return port;
}

private static class ServiceTask implements Runnable {
Socket clent = null;

public ServiceTask(Socket client) {
this.clent = client;
}

public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
// 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
input = new ObjectInputStream(clent.getInputStream());
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Class serviceClass = serviceRegistry.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " not found");
}
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(serviceClass.newInstance(), arguments);

// 3.将执行结果反序列化,通过socket发送给客户端
output = new ObjectOutputStream(clent.getOutputStream());
output.writeObject(result);

} catch (Exception e) {
e.printStackTrace();
} finally {
if (output != null) {
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (clent != null) {
try {
clent.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}

3.客户端的远程代理对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RPCClient<T> {
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
// 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream output = null;
ObjectInputStream input = null;

try {
// 2.创建Socket客户端,根据指定地址连接远程服务提供者
socket = new Socket();
socket.connect(addr);

// 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);

// 4.同步阻塞等待服务器返回应答,获取应答后返回
input = new ObjectInputStream(socket.getInputStream());
return input.readObject();

} finally {
if (socket != null) socket.close();
if (output != null) output.close();
if (input != null) input.close();
}
}
});
}
}

4.调用测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RPCTest { 
public static void main(String[] args) throws IOException {
new Thread(new Runnable() {
public void run() {
try {
Server serviceServer = new ServiceCenter(8088);
serviceServer.register(HelloService.class, HelloServiceImpl.class);
serviceServer.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));
System.out.println(service.sayHi("test"));
}
}

这里实现的简单RPC框架是使用Java语言开发,与Java语言高度耦合,并且通信方式采用的Socket是基于BIO实现的,IO效率不高,还有Java原生的序列化机制占内存太多,运行效率也不高,可以考虑从以下几个方面进行改进:

1)可以使用NIO或直接使用Netty替代BIO实现;

2)服务注册可以使用Zookeeper进行管理,能够让应用更加稳定。

3)可以采用基于JSON协议进行序列化的数据传输,也可以使用Hadoop Avro与Google protobuf等开源序列化机制。

2.protobuf

2.1 protobuf简介

Protocol Buffers,是Google公司开发的一种数据描述语言,类似于xml、json能够将结构化数据序列化,可用于数据存储、通信协议等方面,它不依赖于语言和平台且可扩展性极强。

json一般用于web项目中,因为浏览器对于json数据支持非常好,有很多内建的函数支持;xml在webservice中应用最为广泛,但是相比于json,它的数据更加冗余,因为需要成对的闭合标签;protobuf是谷歌开源的一种数据格式,适合高性能,对响应速度有要求的数据传输场景,profobuf本身是二进制数据格式,需要编码和解码,数据本身不具有可读性,因此只能反序列化之后得到真正可读的数据。相比于xml和json,profobuf序列化后数据文件很小,解析速度快,生成了更容易在编程中使用的数据访问。

但是protobuf与xml和json并不是完全等同的层级,将protobuf、xml、json三者放到一起去比较,应该区分两个维度:数据结构化、数据序列化。数据结构化主要面向开发或业务层面,数据序列化面向通信或存储层面,数据结构化侧重人类可读性甚至有时会强调语义表达能力,而数据序列化侧重效率和压缩。

xml作为一种扩展标记语言,json作为源于js的数据格式,都具有数据结构化的能力,尽可能保证其人类可读以便开发人员进行编辑,这就是面向业务或开发层面的数据结构化。json和xml同样也可以直接被用来数据序列化,例如直接采用json和xml进行网络通信传输,此时json和xml就成了一种序列化格式,它发挥了数据序列化的能力。但是实际将json和xml直接作为数据序列化通常并不是最优选择,因为它们在速度、效率、空间上并不是最优,它们更适合数据结构化而非数据序列化。

同样的protobuf也具有数据结构化的能力,其实也就是message定义,我们能够在 .proto文件中,通过message、import、内嵌message等语法来定义数据结构化规则,但是很容易能够看出,protobuf在数据结构化方面和json和xml相差较大,人类可读性较差。但是如果从数据序列化的角度会发现protobuf有着明显的优势,效率、速度、空间几乎全面占优。

2.2 java简单实现protobuf实战

1.引入maven依赖

引入相关依赖,注意这里的依赖版本需要与protoc编辑器一个版本,否则会导致根据.proto文件编辑出的bean文件报语法错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
<!--  protobuf 支持 Java 核心包-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.15.3</version>
</dependency>

<!-- proto 与 Json 互转会用到-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.15.3</version>
</dependency>

2.编写.proto文件

.proto其实就是对数据结构的定义,必须按照该结构存储的数据才能够进行序列化和反序列化,如下编写一个demo.proto文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
//生成 proto 文件所在包路径
package com.wxw.notes.protobuf.proto;
//生成 proto 文件所在包路径
option java_package = "com.wxw.notes.protobuf.proto";
//生成 proto 文件名
option java_outer_classname="DemoProto";

message Demo{
//自身属性
int32 id = 1;
string code = 2;
string name = 3;
}

3.编译.proto文件生产java类

编译器下载地址:https://github.com/protocolbuffers/protobuf/releases,根据依赖protobuf-java包的版本下载对应版本的编译器,下载后解压安装,使用如下查看版本命令检测是否安装成功:(亲测在window上安装和使用更加方便)

1
protoc --version

安装成功后使用如下命令编译.proto文件:

1
protoc --java_out=. demo.proto

将生成后的.java文件放入项目目录中,生成文件如下:

4.protobuf序列化测试

基于生产结构化类创建数据对象,使用protobuf对其序列化和反序列化,并与json进行比较,相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SimpleTestMain {
public static void main(String[] args) {
//初始化数据
DemoProto.Demo.Builder demo = DemoProto.Demo.newBuilder();
demo.setId(1)
.setCode("001")
.setName("张三")
.build();

//构建结构化对象
DemoProto.Demo build = demo.build();

//序列化为字节数组
byte[] s = build.toByteArray();
System.out.println("protobuf数据bytes[]:" + Arrays.toString(s));
System.out.println("protobuf序列化大小: " + s.length);

//反序列化为结构化对象
DemoProto.Demo demo1 = null;
try {
demo1 = DemoProto.Demo.parseFrom(s);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}

//结构化对象序列化为json字符串
String jsonStr = null;
jsonStr = JsonFormat.printer().print(demo1);
System.out.println("Json格式化结果:\n" + jsonStr);
System.out.println("Json格式化数据大小: " + jsonStr.getBytes().length);
}
}

比较结果如下:

3.HBase的RPC架构

3.1 HBase的RPC概述

作为一个分布式系统,hbase的设计是典型的master-salve架构。hbase中主要有master,regionserver,client这三个角色,这三个角色之间rpc的调用关系如下图:

client

hbase的client种类有很多,比方说:hbase shell, Java client API等,client只是RPC服务的调用方。

Master

master主要实现了MasterService和RegionServerStatus协议,作为RPC服务的提供方分别供Client和RegionServer调用。

1)MasterService主要定义了获取集群状态,以及获取表的元信息,添加/删除列,assign region, enable/disable table,负载均衡等DML相关的一些服务。而Master提供了对这些服务的实现,并且供客户端去调用。

2)RegionServerStatus主要定义了regionserver向master汇报集群状态,regionserver启动向master发送rpc请求等相关的服务,而master根据这些rpc请求信息,可以了解整个集群中regionserver的状态。

ReginServer

RegionServer主要实现了AdminService和ClientService协议,供client端调用。

1)AmdinService主要定义了获取table Regin信息,操作region(Open,Flush,Split,Compact, Merge等)相关服务。

2)ClientService主要定了get,add,delete,multi,Scan等相关的服务。

注意:RegionServer却只提供了对client的Rpc服务,而没有提供对Master的rpc的服务。当Master想要向RegionServer发送请求时,比如master启动负载均衡时,需要让regionServer移动region时,是通过zookeeper实现的。因为当Master向RegionServer传递信息时,可能需要向多台reginserver传递信息,而通过zookeeper中的node简单变化主动通知regionserver更加方便快捷。

3.2 Client客户端

调度执行

该模块主要提供接口转换、错误重试、服务分组等能力:

1)接口转换:服务层定义的服务接口与用户层的不同,比如put/delete/increment/append等操作底层都是调用的mutate接口;而batch相关的操作,无论是读还是写都调用multi接口。client调用call方法后,首先会把传入的参数封装成call对象(该对象包含方法名称,调用参数,连接地址等信息),client端有一个Map对象connections,缓存了连接信息。最后将所有信息封装为一个callable对象,交由RpcRetryingCaller处理。

2)错误重试:RpcRetryingCaller负责与服务代理模块直接交互 ,以及错误时的重试;

3)服务分组:batch相关的操作可能会涉及到多个rs,需要按照rs进行分组,然后多线程并发请求,这些逻辑是在AsyncProcess中;对于非batch类请求则直接使用RpcRetryingCaller进行调用,AsyncProcess的内部实际上也是依赖了该类来执行单个rs请求。

服务代理

服务代理通常叫stub,即桩的意思,其实现了与服务端同样的接口,对调度执行模块而言,调用stub的方法就相当于调用远程的服务,而不必关心实现细节。

这部分依赖protobuf组件,通过在proto文件中定义service及message类型的参数,可直接生成接口和stub实现类。

在ConnectionImplementation类中有一个Map类型的stubs变量,其key为service name + regionserver,value则是具体每个regionserver对应的stub实例。

通信模块

该模块主要进行序列化和io处理,目前HBase已采用netty作为底层的io框架,客户端的核心类为NettyRpcClient。

序列化则是依赖protobuf组件,序列化与反序列化的逻辑都放在NettyRpcDuplexHandler中,该类注册在netty的pipeline,会基于不同的事件自动调用。

3.3 Server服务端

通信模块

该模块主要负责数据的读取、反序列化并封装为call对象,核心实现类为NettyRpcServer,通过在pipeline中注册的一些handler来完成上述处理。call对象中的数据包括:请求的方法名、优先级、超时时间等总体描述,详情见RPC.proto文件中的RequestHeader;GetRequest、MutateRequest等所请求方法的参数,详情见Client.proto和HBase.proto文件中的相关定义。

调度执行

通信模块得到的call对象会交由rpcScheduler进行调度,目前默认实现为SimpleRpcScheduler。rpcScheduler的主要作用是根据请求类型把请求分配给不同的rpcExecutor实例,请求类型有3种:普通请求、高优先级请求和replication请求,而rpcExecutor的实现目前主要由RWQueueRpcExecutor和FastPathBalancedQueueRpcExecutor两种,不同的类型使用了不同实现,关系如下:

RWQueueRpcExecutor的特点是内部可以对读写隔离,以及对get和scan隔离,所谓隔离的意思是,call对象会放入独立的callQueue,并使用独立的handler进行处理:

FastPathBalancedQueueRpcExecutor不支持隔离,其特点是对于空闲的handler,让其自旋而不是阻塞,以减少线程上下文切换的消耗:

服务实现

服务端实现类需要实现一些接口,HMaster的服务实现类主要是MasterRpcServices,HRegionServer的服务实现类主要是RSRpcServices。service相关的类会在启动阶段进行初始化,然后在请求处理时根据connection的serviceName获取到对应的service实例,再根据call对象的method和param进行方法的调用。

4.HBase的Coprocessors编写实战

4.1 HBase的Coprocessors简介

在使用hbase时,当数据量非常大,即使网络传输带宽允许,客户端的计算能力也未必能满足要求。协处理器Coprocessors就是为了解决该问题而出现的,它将写好的业务逻辑代码部署在服务端,在服务端执行客户端远程调用前后先执行预设的业务逻辑代码,这样极大地降低了需要传输地数据量,也降低了对客户端计算能力的要求。

Coprocessors也可以帮助用户扩展实现原生HBase目前所不具备的功能,如权限校验、二级索引等。

Coprocessors可以全局导入作用在某个hbase集群的所有表上,也可以设置单独作用在某一张指定的表上。

Coprocessors主要有两类:

Observer

Observer就是最典型的AOP,当服务端发生某些事件时,这类协处理器会在事件发生前后被调用。常用来实现如下功能:

1)权限校验:比如在执行get或put操作之前,编写preget或preput方法检查权限;

2)完整性约束:hbase不支持像关系型数据库中那样的外键功能,可以通过Observer实现在插入或删除数据时,对其他表中的关联数据进行对应操作。

3)二级索引:也可以借助Observer来实现二级索引,实际上就是在读和写之前添加一步寻址操作。

目前hbase支持进行扩展的Observer协处理器主要有四种类型:

1)RegionObserver:允许在region级别事件前后添加业务逻辑,例如get和put操作,具体可以在哪些事件前后添加业务逻辑详情见BaseRegionObserver类源码;

2)RegionServerObserver:允许在regionserver级别事件前后添加业务逻辑,例如启动、停止、执行合并、提交、回滚等,详情见BaseRegionServerObserver源码;

3)MasterObserver:允许在master级别事件前后添加业务逻辑,例如表的创建、删除、schema修改,详情见BaseMasterObserver源码;

4)WalObserver:允许在WAL级别事件前后添加业务逻辑,详情见BaseWalObserver源码。

上述四种Observer接口都继承自Coprocessors接口,这四个接口中定义了所有可以实现AOP的钩子方法,我们一般直接继承其Base实现类,重写必要的方法。

Endpoint

Endpoint与Observer有很大的不同,实现形式也不是AOP形式的,要更加复杂一些。Endpoint代码需要实现CoprocessorService、Coprocessor这两个接口,然后部署在服务端。其中的业务逻辑主要是调用regionserver提供的数据操作接口来实现行数计算、最值计算等聚合计算操作。客户端可以通过rpc直接远程调用Endpoint方法返回结果。

在我看来,Observer相当于AOP,可以在服务端数据操作接口前后添加业务逻辑;而Endpoint相当于直接创建新的服务端数据操作接口,可供客户端调用。

4.2 借助protobuf实现endpoint协处理器实战

1.编写proto协议文件

本次实战实现的是获取最大值的endpoint协处理器,编写getmax.proto协议文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
option java_package = "com.jd.rtc.proto";

option java_outer_classname = "GetMaxProto";
option java_generic_services = true;

message maxRequest {
required string family = 1;
required string column = 2;
}
message maxResponse {
required double max = 1 [default = 0];

}

service maxService {
rpc getmax(maxRequest)
returns (maxResponse);
}

2.使用protoc编译proto协议文件

由于使用的是hbase1.1.6版本,适配的protobuf是2.5.0版本,那么我们需要找到对应的protoc编译器版本。在mac上直接使用homebrow只能下载安装最新版本的protoc,我只能从github上下载2.5.0的protoc包并进行本地安装了。然后使用上文中的protoc编译命令得到如下适配protobuf2.5.0的java类文件:

3.实现协处理器server类

创建协处理器server类继承protobuf的服务类GetMaxProto.maxService,并实现HBase的协处理器接口Coprocessor和CoprocessorService,这个server实现类就是需要打包到jar包中并安装到HBase的regionserver上去的endpoint具体操作类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class GetMaxServer extends GetMaxProto.maxService implements Coprocessor, CoprocessorService {

protected Logger LOG = Logger.getLogger(GetMaxServer.class);

private RegionCoprocessorEnvironment env;

@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}

@Override
public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {

}

@Override
public Service getService() {
return this;
}

@Override
public void getmax(RpcController controller, GetMaxProto.maxRequest request, RpcCallback<GetMaxProto.maxResponse> done) {
String family = request.getFamily();
if ("".equals(family)) {
throw new NullPointerException("you need specify the family");
}
String column = request.getColumn();
if ("".equals(column)) {
throw new NullPointerException("you need specify the column");
}

Scan scan = new Scan();

// 设置扫描对象
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));

// 定义变量
GetMaxProto.maxResponse response = null;
InternalScanner scanner = null;

// 扫描每个region,取值后求和
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
Double max = null;
do {
hasMore = scanner.next(results);
//for (Cell cell : results) {
if (results.isEmpty()) {
continue;
}
Cell kv = results.get(0);
try {
Double temp = Double.parseDouble(new String(CellUtil.cloneValue(kv)));
max = max != null && (temp == null || compare(temp, max) <= 0) ? max : temp;
} catch (Exception e) {
}

results.clear();
//sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));

//results.clear();
} while (hasMore);
// 设置返回结果
response = GetMaxProto.maxResponse.newBuilder().setMax((max !=null ? max.doubleValue():Double.MAX_VALUE)).build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException e) {
//e.printStackTrace();
}
}
}
// 将rpc结果返回给客户端
done.run(response);
}

public static int compare(Double l1, Double l2) {
if (l1 == null ^ l2 == null) {
return l1 == null ? -1 : 1; // either of one is null.
} else if (l1 == null)
return 0; // both are null
return l1.compareTo(l2); // natural ordering.
}
}

4.打包和安装jar包

配置好对应的maven依赖,HBase等依赖包的版本与服务的版本必须适配,将上述协处理器打包成jar包。将jar包上传到HBase集群所在hdfs目录,并使用协处理器安装命令对指定表进行协处理器安装。

协处理器安装命令:

1
alter "fellowshell", method => "table_att", "coprocessor" => "hdfs://localhost:9005/hbase/coprocess/coprocessor_test-1.0-SNAPSHOT.jar|com.jd.rtc.coprocess.GetMaxServer|"

协处理器卸载命令:

1
alter "fellowshell", METHOD => "table_att_unset", NAME => "coprocessor$1"

5.编写client测试请求逻辑

创建封装请求参数的call实现类,并编写多个regionserver返回值的merge操作,对于getmax方法的merge操作就是再比较取最大值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class GetMaxTest {
//设置参数
private static final String TABLE_NAME = "test2";
private static final String FAMILY = "f1";
private static final String COLUMN = "PBRL";
private static final byte[] STRAT_KEY = Bytes.toBytes("00");
private static final byte[] END_KEY = Bytes.toBytes("ff");

public static void main(String[] args) throws Exception {

// 配置HBse
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "linux3,linux4,linux5");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.setLong("hbase.rpc.timeout", 600000);
System.setProperty("hadoop.home.dir", "F:/ruanjian/hadoop-2.6.0-cdh5.14.0");

// 建立一个连接
Connection conn = ConnectionFactory.createConnection(conf);
// 获取表
HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));

// 设置请求对象
final GetMaxProto.maxRequest request = GetMaxProto.maxRequest.newBuilder().setFamily(FAMILY).setColumn(COLUMN).build();

try {
// 获得返回值
Map<byte[], Double> result = table.coprocessorService(GetMaxProto.maxService.class, STRAT_KEY, END_KEY,
new Batch.Call<GetMaxProto.maxService, Double>() {
@Override
public Double call(GetMaxProto.maxService maxService) throws IOException {
BlockingRpcCallback<GetMaxProto.maxResponse> rpcCallback = new BlockingRpcCallback<GetMaxProto.maxResponse>();
maxService.getmax(null, request, rpcCallback);
GetMaxProto.maxResponse response = rpcCallback.get();
return response.getMax();
}
});
Double max = null;
// 将返回值进行迭代相加
for (Double temp : result.values()) {
max = max != null && (temp == null || compare(temp, max) <= 0) ? max : temp;
}
// 结果输出
System.out.println("max: " + max);

} catch (ServiceException e) {
e.printStackTrace();
}catch (Throwable e) {
e.printStackTrace();
}
table.close();
conn.close();
}

public static int compare(Double l1, Double l2) {
if (l1 == null ^ l2 == null) {
return l1 == null ? -1 : 1; // either of one is null.
} else if (l1 == null)
return 0; // both are null
return l1.compareTo(l2); // natural ordering.
}
}

参考文献

简单实现gPRC案例Tutorial: Using Google RPC/ProtocolBuffers for Remote Services

HBase rpc框架介绍

hbase rpc这点事

hbase协处理器endpoint示例(求最值)