Java服务_SV服务缓存全量更新和增量更新方案

Java服务_SV服务缓存全量更新和增量更新方案

一、背景

目前元数据更新方案:

此处说的元数据中心是指分布式缓存Redis中的指标服务元数据,元数据每5分钟更新一次,更新全量元数据。

为什么需要增量更新:

a、数据一致性:全量更新每5分钟元数据更新一次,用户影响路由的变动需要至多5分钟才能生效,影响用户体验,特别是对于频繁发生变动(调试)的预发环境来说。

b、稳定性:生产消费联动的能力基础,比如某张表能不能用要及时的感知,双流策略切换要及时感知等。

二、全局链路与更新范围

1.全局链路

元数据更新涉及到后端BE、元数据中心META、定义驱动SV三部分。

img

增量更新和全量更新并行:可能会出现问题,考虑增量更新和全量更新用一把锁,全量更新抢占锁的权重更大,详细设计见元数据中心的增量更新技术方案。

增量更新并行:改为顺序消费MQ的消息的话可以防止此问题。

2.定义驱动更新范围:

1)属于定义驱动且需要增量更新的元数据:目前存在8种元数据,BE需要更新逻辑表,SV需要更新指标、修饰、维度、逻辑表、二级索引。

2)属于定义驱动但不需要更新的元数据:已经不在界面上维护,更新频率较低,因此5分钟的全量更新即可满足更新需求。

img

三、详细设计

1.整体更新流程

全量更新链路为:到达更新时间点(定时任务),元数据中心从Mysql数据库中拉取数据并构建元数据Bean,将这些元数据Bean实例转化为Json字符串存入Redis中,更新完后发送MQ消息;定义驱动工程接收MQ更新消息,获取Redis中更新后的元数据详情,解析Json字符串并构建缓存和二级索引Bean,最后一起放入本地缓存LocalCache中。

增量更新过程与全量更新类似,不同点在于触发点和增量更新逻辑表详情和二级索引的过程。

img

2.增量更新的触发点梳理

目前主要是逻辑表配置页面接口的更新入口处,发送对应的增量更新的消息;对于水下能力带来的数据库数据更新,保持现状,依靠5分钟全量更新。

入口 环境 消息类型 分类 说明 入口类型
(“/logic/add”) 预发 新增 水上 逻辑表-新增,新版需发布,旧版不走xbp直接发布 http
(“/logic/delete”) 预发 删除 水上 逻辑表-删除,当前环境和线上全删除 http
(“/logic/edit”) 预发 更新 水上 逻辑表-编辑,当前环境全量更新,线上环境只更新逻辑表Meta http
(“/logic/release”)
–add:Response<Map<String, Object>> logicTableAdd 线上 新增 水上 线上add逻辑表 JSF
–edit:Response<Map<String, Object>> logicTableUpdate 线上 更新 水上 线上update逻辑表 JSF
–delete:Response<Map<String, Object>> deleteLogicTable 线上 删除 水上 线上删除逻辑表 JSF

为了防止元数据缓存更新失败,在每个触发点发完MQ消息之后,将一个标志变量放入ThreadLocal中,然后在需要改动数据库表的service或Mapper处检查这个标志变量,如果存在标志变量说明此次变更已经发过MQ消息了,符合预期;如果不存在标志变量说明此次变更未发过MQ消息,上述变更入口没有覆盖到这次变更,报警处理。

3.如何在增量更新时发送消息

要想实现实时增量更新,需要在BE的数据变动时,发送一条增量更新的mq消息,目前元数据中心有对应接口提供:

调用元数据中心包中的接口

1
2
3
4
5
6
7
8
9
10
/**
* 接口会自动发出一个元数据更新的mq
* @param metaType 更新的元数据类型
* @param realTimeRefreshMsgType 更新的type : CHANGE / DELETE
* @param envType 更新的环境类型 ALL/PRE/PROD ?
* @param idList 发生变更的元数据id list
* @param topic 指定发送的topic
* @return
*/
public String dataChangeMessage(MetaType metaType, RealTimeRefreshMsgType realTimeRefreshMsgType, EnvType envType, List<Long> idList, String topic);

需要配置一个jmq 的producer来满足发送jmq的需求:

1
2
3
4
5
6
7
8
9
10
jmq:
enabled: true
producers:
metaDataRealTimeRefreshProducer: --producerName要保持一致
password: password
app: dataassetmeta
address: nameserver.jmq.jd.local:80
enabled: true
policy:
retryTimes: 3

4.增量更新逻辑表详情和二级索引

因为二级索引的更新较为复杂,所以展开叙述:

二级索引是一个Map,键是元数据的id,值是Relation对象列表,如:Map<Long, List<DriveLogicTableDecorateRelationPO>>,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 指标id->逻辑表RelationPO 二级索引
*/
private final Cache<Long, List<DriveLogicTableMetricRelationPO>> metricLogicTableRelationCache = CacheBuilder.newBuilder()
.maximumSize(MAXIMUM_SIZE)//注:Cache指定maximumSize后,它不会立即占用全部指定的内存大小,而是会根据需要逐渐扩展。
.concurrencyLevel(CONCURRENCY_LEVEL)
.recordStats()
.build();

/**
* 维度id->逻辑表RelationPO 二级索引
*/
private final Cache<Long, List<DriveLogicTableFieldRelationPO>> dimensionLogicTableRelationCache = CacheBuilder.newBuilder()
.maximumSize(MAXIMUM_SIZE)
.concurrencyLevel(CONCURRENCY_LEVEL)
.recordStats()
.build();

/**
* 修饰id->逻辑表RelationPO 二级索引
*/
private final Cache<Long, List<DriveLogicTableDecorateRelationPO>> decorateLogicTableRelationCache = CacheBuilder.newBuilder()
.maximumSize(MAXIMUM_SIZE)//这个值分开定义
.concurrencyLevel(CONCURRENCY_LEVEL)
.recordStats()
.build();

更新流程:

img

更新二级索引流程举例:

拿指标举例:原有的本地缓存中有逻辑表L1,L2,二级索引:M1->[R(L1),R(L2)]

分类 本地缓存内容 增量拉取内容 动作 新的本地缓存
删除(删除逻辑表) 逻辑表L1,L2;二级索引:M1->[R(L1),R(L2)] id详情:L1 删除L1相关的二级索引,即删除二级索引:M1->[R(L1)] 逻辑表L1,L2;二级索引:M1->[R(L2)]
更新(新增R) 逻辑表L1,L2;二级索引:M1->[R(L1),R(L2)] id详情:L1R:L1的RList中有M1、M2 查询L1中所有涉及到的二级索引,删除L1相关的所有二级索引,即删除二级索引:M1->[R(L1)];然后新增由L1构建的新二级索引(用add,在旧的缓存基础上添加) 逻辑表L1,L2;二级索引:删后变为:M1->[R(L2)],增完最后变为:M1->[R(L1),R(L2)]M2->[R(L1)]
更新(删除R) 逻辑表L1,L2;二级索引:M1->[R(L1),R(L2)] id详情:L1R:L1的RList中无M1 逻辑表L1,L2;二级索引:删后变为:M1->[R(L2)],增完最后变为:M1->[R(L2)]
更新(更新R) 逻辑表L1,L2;二级索引:M1->[R(L1),R(L2)] id详情:L1R:L1的RList中还是M1 逻辑表L1,L2;二级索引:删后变为:M1->[R(L2)],增完最后变为:M1->[R’(L1),R(L2)]

获取本地非idList的R对象举例(删除二级索引中idList相关项,以单逻辑表id、修饰场景举例):

1
2
3
4
5
6
// 仅获取本地缓存中,非目标逻辑表的相关关系
List<DriveLogicTableDecorateRelationPO> decorateRelationPOList = getDecorateRelationByIds(decorateIds).stream()
.filter(dr -> dr.getDriveLogicTableId() != metaDriveLogicTableInfo.getId()).collect(toList());
// 补充变更逻辑表的修饰关系
decorateRelationPOList.addAll(metaDriveLogicTableInfo.getDecorateRelationList().stream()
.map(d -> convert2DriveLogicTableDecorateRelationPO(d)).collect(toList()));

5.补充

a、增量更新上线后,每5分钟的全量更新可以延长更新时间(如半小时),作为增量更新的一个兜底。

b、缓存全量更新和增量更新,都需要添加ducc开关,用于控制总的全量和增量更新开关,也要有不使用缓存的开关,防止缓存更新链路出问题或者缓存数据有误,需要快速恢复。

c、增量更新如果失败了之后的处理方式:更新失败报警报出来,然后重试(更新操作幂等)。

d、确认一下,消息消费是不是顺序的?–不是。

e、Cache目前是全量–>后续转化为LRU–>有重复开发–>拿元数据兜底而非BE,但是元数据没有二级索引。