【AgileTC】_WebSocket服务器应用
1.AgileTC开源版本
- 访问映射路径:”/api/case/{caseId}/{recordId}/{isCore}/{user}”
- 本质是使用一个ConcurrentHashMap<String, WebSocket>来保存所有websocket连接。
- save方法
- 找到当前所有连接到了该对象(用例集或者测试任务)上的websocket连接,并找到其中更新时间最大的那一个对象;
- 将更新时间最大的对象与数据库中该对象的更新时间进行比较,如果数据库中对象更新时间更大,则将数据中对象的树型结构内容赋值给所有连接对象,并将所有连接的数据对象更新时间设置为0,然后返回;
- 反之,则将跟新时间最大的对象中的树型结构内容复制给所有连接对象,并将所有连接的数据对象更新时间设置为0,并将其保存到数据库中该对象上去,并更新数据库中的跟新时间为当前时间。
- @OnOpen注解方法
- 当有用户浏览器访问映射路径时,会创建一个WebSocket类的实例对象并触发该方法;
- 如果当前服务器中有访问同一对象的其他websocket连接则save方法一下该对象,确保数据库中和其他所有连接中的内容已经是最新的;
- 将新连接加入到记录连接的hashmap中去,并将数据库中该对象的数据通过websocket连接中的session发送到客户端浏览器中。
- @OnClose注解方法
- 当客户端浏览器关闭映射路径对应页面时就会执行该方法;
- 会先执行save方法,此时的所有连接包括正要关闭的这个连接;
- 然后将该连接移除记录所有连接的hashmap。
- @OnMessage注解方法
- 当websocket连接中的服务器端收到用户端发来的信息时会执行该方法;
- 如果该消息是pong消息则无事发生;
- 将该消息发送给所有访问该对象的客户端浏览器,包括其他用户和自己,并根据该消息修改本连接中的树型结构内容,把本连接的更新时间设置为当前时间戳;(注意此处并没有将访问该对象的其他连接中的树型结构内容更新,也就是说此时其他用户的websocket连接中后端对象中的树型结构与前端浏览器显示的树型结构并不是一样的,前端浏览器中显示的全是最新的。)
- @OnError注解方法
- 当websocket连接出现异常时会执行该方法;
- 会先执行save方法,此时的所有连接包括正要关闭的这个连接;
- 然后将该连接移出记录所有连接的hashmap。
2.AgileTC滴滴内部版
WebSocket对象主要属性
- caseContent:用例树形结构对象序列化字符串;
- updateTime:更新时间,分为用例更新时间和任务更新时间。
三个比较重要的集合:
- ConcurrentHashMap<String, Integer>类型的userInfo用于保存当前连接的用户,其中key为用户名字符串,值为同一用户创建的连接个数;
- ConcurrentHashMap<String, WebSocket>类型的webSocket用于保存当前建立连接的WebSocket对象,其中key为用例id+任务id+sessionid,值为WebSocket对象;
- List<String>类型的keys用于保存所有的session信息,其中元素数据为用例id+任务id+sessionid。
WebSocket同步
- 容器会为每一个WebSocket连接创建一个EndPoint的实例,用实例变量来保存一些状态信息,也就是说WebSocket对象是多例的;
- 为@OnOpen、@OnClose、@OnMessage、@OnError四个方法加上synchronized锁,因为这四个方法都是非静态方法,所以为这四个方法加上的synchronized锁的锁对象就是其所在的WebSocket实例对象,也就是说同一个连接中并发执行的这四个方法会被同步,而不同连接中并发执行的这四个方法并不会被同步。
静态代码块用于创建一个线程池,执行任务:定时向所有WebSocket连接的客户端发送ping消息,坚持连接是否正常;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27static {
log.info("[线程池执行ping-pong] time = {}", System.currentTimeMillis());
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(() -> {
try {
// 看看是不是链接关闭了,如果没有收到就关闭
while (true) {
for (Map.Entry<String, WebSocket> entry : webSocket.entrySet()) {
if (!UNDEFINED.equals(entry.getValue().caseId)) {
entry.getValue().sendMessage(PING_MESSAGE);
// 看看是不是过时的内容
if (System.currentTimeMillis() - entry.getValue().pongTimeStamp > 6000) {
log.error("[线程池执行ping-pong出错]准备关闭当前websocket={}", entry.getValue().toString());
entry.getValue().onClose();
}
}
}
Thread.sleep(5000);
}
} catch (Exception e) {
log.error("[线程池执行ping-pong出错]错误原因e={}", e.getMessage());
e.printStackTrace();
}
});
}updateCaseMessage()方法
- 将当前所有同用例id或任务id的WebSocket对象中的caseContent更新为最新的,更新时间设置为0;数据库中对应的用例或任务也更新为最新的。
@OnOpen注解方法
- 1.利用ddmq消息队列使所有服务器都执行updateCaseMessage()方法。这里之所以使用消息队列,是因为在内部版该项目是集群部署,必须使用消息队列使集群中每一台服务器都收到该更新信息,以使得集群中而开源版是单机部署则不需要使用消息队列;
- 2.从缓存或者数据库中拉取获取caseContent信息,发送给前端渲染成树型图,并保存到缓存中;
- 3.将相应数据保存到上述三个集合中,其中userInfo的更新也需要使用ddmq消息队列,并更新该webSocket对象中的caseContent字符串和更新时间;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public synchronized void onOpen(@PathParam(value = "caseId") String caseId,
@PathParam(value = "recordId") String recordId,
@PathParam(value = "isCore") String isCore,
@PathParam(value = "user") String user,
Session session) {
try {
log.info("[websocket-onOpen 尝试开启新的session][{}] caseId={}, recordId={}, isCore={}",
recordId == null || UNDEFINED.equals(recordId) ? "测试用例" : "执行任务", caseId, recordId, isCore);
this.session = session;
this.caseId = stringFill(caseId, 8, '0', true);
this.recordId = stringFill(recordId, 8, '0', true);
this.isCore = isCore;
this.user = user;
this.updateCaseTime = 0;
this.updateRecordTime = 0;
this.pongTimeStamp = System.currentTimeMillis();
log.info("[websocket-onOpen 开启新的session成功]当前session={}", toString());
// 其实这里只是将需要跟新的用例id或者任务id发送到ddmq消息队列,
// 消费者收到消息后会使用从消息队列获得的信息执行本类中的updateLatestCase方法。
wsMsgAnycProducerService.updateCaseMessage(this.caseId,this.recordId,isCore,user);
//打开用例
String key = stringFill(this.caseId, 8, '0', true) + stringFill(this.recordId, 8, '0', true)+isCore;
if (codis.exists(key)) {
log.info("[websocket-onOpen 从redis获取内容] key={}", key);
// 这里是将本用例或任务的content树信息从redis缓存中发送给前端
sendMessage(codis.get(key));
} else {
log.info("[websocket-onOpen 从db获取内容] caseId={}, recordId={}", caseId, recordId);
// 缓存中没有,就从数据库中去取content树信息,发送给前端并保存到redis缓存中
open(this.caseId, this.recordId, isCore);
}
// "2".equals(isCore)表示该用例为冒烟用例,无需执行更新操作
if (!"2".equals(isCore) && !UNDEFINED.equals(this.caseId)) {
String session_key = this.caseId + this.recordId + stringFill(session.getId(), 8, '0', true);
keys.add(session_key);
webSocket.put(session_key, this);
// 将用户信息发送到ddmq消息队列,消费者收到消息后会将用户信息更新到userinfo中。
wsMsgAnycProducerService.addUser(user);
// 刚创建的这个WebSocket对象中的caseContent更新为最新content,更新时间设为0
webSocket.get(session_key).caseContent = codis.get(key);
webSocket.get(session_key).updateCaseTime = 0L;
}
log.info("[websocket-onOpen 打开用例成功]打开的用例id={}, 当前用户数量={}", this.caseId, webSocket.size());
} catch (Exception e) {
log.error("[websocket-onOpen 打开用例失败]session={}, 当前用户数量={}", toString(), e.getMessage());
e.printStackTrace();
}
}@OnClose注解方法
- 1.在当前服务器中updateCaseMessage();
- 2.用ddmq通知集群中其他服务器,对该用例或任务也进行updateCaseMessage();
- 3.更新缓存中的用例或任务;
- 4.三个集合删除相应数据。
@OnMessage注解方法
- 1.将客户端修改后发回来的caseContent版本号+1重新发回客户端;
- 2.使用消息队列使当前所有服务器中同用例id或任务id的所有WebSocket对象中的caseContent可靠更新;
- 3.更新缓存。
@OnError注解方法
- 返回错误信息给客户端。