【二三方】_基于线程池的直充接口异步响应

基于线程池的直充接口异步响应

基于线程池的直充接口异步响应

由于拼多多要求虚拟产品直充接口在500ms内返回响应,所以采用异步响应的模式:创建任务,用于调用后续权益下发等接口,并将该任务提交到线程池中。

1.创建实现Runnable接口的任务类

重写run()方法,完成后续权益下发等逻辑,并回调拼多多的接口告诉拼多多充值成功与否。多线程Runable任务的参数一般是以对象实例对象的形式配置的,通过构造方法来输入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 异步处理线程
* {@link com.youku.crmtoali.service.activity.impl.PddActivityServiceImpl}
*/
@Data
class YkPddOrderVideoTask implements Runnable {

private Object ctx;
private PddOrderVideoDTO pddOrderVideoDTO;
private ActivityConfigDO activityConfig;

public YkPddOrderVideoTask(Object ctx, PddOrderVideoDTO pddOrderVideoDTO, ActivityConfigDO activityConfig) {
this.ctx = ctx;
this.pddOrderVideoDTO = pddOrderVideoDTO;
this.activityConfig = activityConfig;
}

@Override
public void run() {
// 鹰眼链路
EagleEye.setRpcContext(ctx);
try {
// 1.开始做异步直充逻辑
IActivityBizService iActivityBizService = map.get(activityConfig.getActivityType());
ResponseDTO responseDTO = iActivityBizService.participateActivityBiz(activityConfig.getActivityCode(),
UserTypeEnum.PHONE.name(), pddOrderVideoDTO.getChargeNo(), pddOrderVideoDTO.getOutOrderNo(), null);

// 2.直充日志打印
Boolean success = responseDTO.isSuccess();
LogPrintUtil.printStoreInfo(log, StoreConst.STORE_RECHARGE_LOG, "YkPddOrderVideoHsfServiceImpl", "YkPddOrderVideoTask",
StoreConst.PDD_LOG_CHANNEL, pddOrderVideoDTO.getOutOrderNo(), success ? "直充成功" : "直充失败",
JSONObject.toJSONString(pddOrderVideoDTO), null);

// 3.Pdd回调通知
callbackPdd(pddOrderVideoDTO, success);
} catch (Exception e) {
LogPrintUtil.printStoreError(log, StoreConst.STORE_INFO_LOG, "YkPddOrderVideoHsfServiceImpl", "YkPddOrderVideoTask",
StoreConst.PDD_LOG_CHANNEL, pddOrderVideoDTO.getOutOrderNo(), "异步线程异常",
PddMsgEnum.PDD_SERVICE_ERROR.getCode(), JSONObject.toJSONString(pddOrderVideoDTO), null, e);
} finally {
EagleEye.clearRpcContext();
}
}
}

2.创建线程池管理类

创建线程池管理类,在静态代码块中创建几个线程池,用于进行响应的异步工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ThreadPoolManager {

private static Map<PoolTypeEnum, ThreadPoolExecutor> poolManagerMap = new ConcurrentHashMap<>(1 << 5);

/**
* 线程工厂
*/
private static final ThreadFactory THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("STORE-YK-PROCESSOR-%d").build();

public enum PoolTypeEnum {
/**
* io 密集型任务
*/
IO_TASK,
/**
* 订单任务使用线程
*/
STORE,
/**
* 失败订单重试定时任务线程池
*/
FAILED_TASK;
}

static {
poolManagerMap.put(PoolTypeEnum.IO_TASK,
new ThreadPoolExecutor(50, 100, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1 << 16),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()));
poolManagerMap.put(PoolTypeEnum.STORE,
new ThreadPoolExecutor(20, 50, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1 << 16),
THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy()));
poolManagerMap.put(PoolTypeEnum.FAILED_TASK,
new ThreadPoolExecutor(20, 50, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1 << 16),
THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy()));
}

/**
* 默认线程池
*/
private static final ThreadPoolExecutor DEFAULT_THREAD_POOL = new ThreadPoolExecutor(50, 100, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1 << 16),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

/**
* 获取线程池方法
*
* @param poolTypeEnum
* @return
*/
public static ThreadPoolExecutor getExecutor(PoolTypeEnum poolTypeEnum) {
ThreadPoolExecutor threadPoolExecutor = poolManagerMap.get(poolTypeEnum);
if (Objects.isNull(threadPoolExecutor)) {
return DEFAULT_THREAD_POOL;
}
return threadPoolExecutor;
}
}

3.在直充接口中将新任务放入线程池中

在提供给拼多多的直充接口中,添加将新任务放入线程池中的逻辑,然后直接返回响应成功,后续权益发放、回调通知的逻辑交给线程池中的线程去做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 3.异步线程处理
ThreadPoolExecutor executor = ThreadPoolManager.getExecutor(ThreadPoolManager.PoolTypeEnum.STORE);
if (Objects.isNull(executor)) {
LogPrintUtil.printStoreError(log, StoreConst.STORE_INFO_LOG, "YkPddOrderVideoHsfServiceImpl", "createOrder",
StoreConst.PDD_LOG_CHANNEL, pddOrderVideoDTO.getOutOrderNo(), "异步任务未获取到对应的线程池",
PddMsgEnum.PDD_SERVICE_ERROR.getCode(), JSONObject.toJSONString(pddOrderVideoDTO), null, null);
return PddResponseDTO.error(PddMsgEnum.PDD_SERVICE_SUCCESS, result);
}

try {
executor.execute(new YkPddOrderVideoTask(EagleEye.getRpcContext(), pddOrderVideoDTO, activityConfig));
} catch (Exception e) {
// 线程拒绝了策略,需要通知拼多多,让他们执行重试策略。
LogPrintUtil.printStoreError(log, StoreConst.STORE_INFO_LOG, "YkPddOrderVideoHsfServiceImpl", "createOrder",
StoreConst.PDD_LOG_CHANNEL, pddOrderVideoDTO.getOutOrderNo(), "线程池拒绝策略",
PddMsgEnum.PDD_SERVICE_ERROR.getCode(), JSONObject.toJSONString(pddOrderVideoDTO), null, null);
return PddResponseDTO.error(PddMsgEnum.PDD_SERVICE_SUCCESS, result);
}

// 4.操作结果回传Pdd
result.setStatus(PDD_ACCEPT);
return PddResponseDTO.error(PddMsgEnum.PDD_SERVICE_SUCCESS, result);

基于策略模式的数据库失败订单轮询处理

使用定时任务查询数据库,查询失败订单;使用设计工厂模式和策略模式,针对订单状态字段的不同值,调用不同的工厂实例执行抽象方法的不同实现逻辑。之前介绍过借助Spring依赖注入来创建工厂的形式,这里则是采用applicationContext.getBean(Class<T> var1)方法通过Class类来获取实例bean的工厂。

1.创建实例bean接口

先创建一个接口,后续的工厂对象类都是该接口的实现类。

1
2
3
4
5
6
7
8
public interface FailedOrderStrategy {

/**
* 失败订单处理策略
* @param activityRecordList
*/
void doDealFailedOrder(List<ActivityRecordDO> activityRecordList);
}

2.创建工厂对象类

创建工厂对象类,并将它们放入Spring容器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Service
@Slf4j
public class PddUnYkRechargeOrder implements FailedOrderStrategy {

@Autowired
private YkPddOrderVideoHsfService ykPddOrderVideoHsfService;

@Autowired
private StoreRetryRecordMapper storeRetryRecordMapper;

@Override
public void doDealFailedOrder(List<ActivityRecordDO> activityRecordList) {
for (ActivityRecordDO activityRecordDO : activityRecordList) {
// 日志打印
LogPrintUtil.printStoreInfo(log, StoreConst.STORE_TASK_LOG, "PddUnYkRechargeOrder", "doDealFailedOrder",
StoreConst.PDD_LOG_CHANNEL, activityRecordDO.getOutOrderId(), "PDD处理未直充订单",
JSONObject.toJSONString(activityRecordDO), null);

// 事务补偿防止情况:订单已重试成功,然后error记录删除失败
if (PDD_RECHARGE_ERROR_NOTICE_SUCCESS.getStatus().equals(activityRecordDO.getResult())
|| PDD_RECHARGE_SUCCESS_NOTICE_SUCCESS.getStatus().equals(activityRecordDO.getResult())) {
storeRetryRecordMapper.deleteByOutOrderId(activityRecordDO.getOutOrderId());
}

ThreadPoolExecutor executor = ThreadPoolManager.getExecutor(ThreadPoolManager.PoolTypeEnum.FAILED_TASK);
if (Objects.isNull(executor)) {
log.error("YkPddOrderVideoHsfServiceImpl task error:executor is null");
}
PddOrderVideoDTO pddOrderVideoDTO = PddOrderVideoDTO.builder()
.chargeNo(activityRecordDO.getUserId())
.outOrderNo(activityRecordDO.getOutOrderId())
.build();
if (JsonUtil.isJson(activityRecordDO.getBenefitLog())) {
pddOrderVideoDTO = JSONObject.parseObject(activityRecordDO.getBenefitLog(), PddOrderVideoDTO.class);
}
ykPddOrderVideoHsfService.rechargeYkBenefit(pddOrderVideoDTO, activityRecordDO.getActivityCode());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Service
@Slf4j
public class PddUnCallBackOrder implements FailedOrderStrategy {


@Autowired
private YkPddOrderVideoHsfService ykPddOrderVideoHsfService;

@Autowired
private StoreRetryRecordMapper storeRetryRecordMapper;

@Override
public void doDealFailedOrder(List<ActivityRecordDO> activityRecordList) {
for (ActivityRecordDO activityRecordDO : activityRecordList) {
// 日志打印
LogPrintUtil.printStoreInfo(log, StoreConst.STORE_TASK_LOG, "PddUnCallBackOrder", "doDealFailedOrder",
StoreConst.PDD_LOG_CHANNEL, activityRecordDO.getOutOrderId(), "PDD处理未回调订单",
JSONObject.toJSONString(activityRecordDO), null);

// 事务补偿防止情况:订单已重试成功,然后error记录删除失败
if (PDD_RECHARGE_ERROR_NOTICE_SUCCESS.getStatus().equals(activityRecordDO.getResult())
|| PDD_RECHARGE_SUCCESS_NOTICE_SUCCESS.getStatus().equals(activityRecordDO.getResult())) {
storeRetryRecordMapper.deleteByOutOrderId(activityRecordDO.getOutOrderId());
}
PddOrderVideoDTO pddOrderVideo = PddOrderVideoDTO.builder()
.outOrderNo(activityRecordDO.getOutOrderId())
.chargeNo(activityRecordDO.getUserId())
.build();
if (JsonUtil.isJson(activityRecordDO.getBenefitLog())) {
pddOrderVideo = JSONObject.parseObject(activityRecordDO.getBenefitLog(), PddOrderVideoDTO.class);
}
// 直充失败未通知的处理方式
if (PDD_RECHARGE_ERROR_UN_NOTICE.getStatus().equals(activityRecordDO.getResult())) {
ykPddOrderVideoHsfService.callbackPdd(pddOrderVideo, false);
}
// 直充成功未通知的处理方式
if (PDD_RECHARGE_SUCCESS_UN_NOTICE.getStatus().equals(activityRecordDO.getResult())) {
ykPddOrderVideoHsfService.callbackPdd(pddOrderVideo, true);
}
}
}
}

3.创建枚举类

创建枚举类,以工厂对象类的Class对象作为枚举的一个属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public enum StoreOrderStatusEnum {

/**
* PDD:落库未处理
*/
PDD_IN_DB_UN_RECHARGE("PDD1", "落库未处理",PddUnYkRechargeOrder.class),
/**
* PDD:充值失败未通知
*/
PDD_RECHARGE_ERROR_UN_NOTICE("PDD2", "充值失败未通知", PddUnCallBackOrder.class),
/**
* PDD:充值成功未通知
*/
PDD_RECHARGE_SUCCESS_UN_NOTICE("PDD3", "充值成功未通知", PddUnCallBackOrder.class),
/**
* PDD:通知失败
*/
PDD_RECHARGE_NOTICE_ERROR("PDD4", "通知失败",PddUnCallBackOrder.class),
/**
* PDD:直充失败已通知
*/
PDD_RECHARGE_ERROR_NOTICE_SUCCESS("PDD5", "直充失败已通知",null),
/**
* ODD:直充成功已通知
*/
PDD_RECHARGE_SUCCESS_NOTICE_SUCCESS("PDD6", "直充成功已通知",null),

/**
* JD:订单状态
*/
JD_ORDER_SUCCESS("0","充值成功",null),
JD_ORDER_PROCESS("1","充值中",JdUnYkRechargeOrder.class),
JD_ORDER_FAILED("2","充值失败",null)
;

private String status;
private String desc;
private Class<? extends FailedOrderStrategy> beanClass;

StoreOrderStatusEnum(String status, String desc, Class<? extends FailedOrderStrategy> beanClass) {
this.status = status;
this.desc = desc;
this.beanClass = beanClass;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

public Class<? extends FailedOrderStrategy> getBeanClass() {
return beanClass;
}

public void setBeanClass(Class<? extends FailedOrderStrategy> beanClass) {
this.beanClass = beanClass;
}
}

4.创建工厂类

创建工厂类,该工厂可以根据输入参数找到对于的枚举类,然后取出枚举类中的工厂对象bean,使用其方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Component
@Slf4j
public class FailedOrderFactory {

private static final Map<String, StoreOrderStatusEnum> YK_STORE_ORDER_STATUS_ENUM_MAP = new HashMap<>();

static {
EnumUtils.getEnumList(StoreOrderStatusEnum.class).forEach(item -> YK_STORE_ORDER_STATUS_ENUM_MAP.put(item.getStatus(), item));
}

@Autowired
private ApplicationContext applicationContext;

@Autowired
private ActivityRecordMapper activityRecordMapper;

public PddResponseDTO<StoreRetryRecordDO> doDealFailedOrder(StoreRetryRecordDO storeRetryRecord) {
// 日志打印
LogPrintUtil.printStoreInfo(log, StoreConst.STORE_TASK_LOG, "FailedOrderFactory", "doDealFailedOrder",
storeRetryRecord.getType(), storeRetryRecord.getOutOrderId(), "定时任务处理订单入口",
JSONObject.toJSONString(storeRetryRecord), null);

// 获取对应的处理策略
StoreOrderStatusEnum storeOrderStatusEnum = YK_STORE_ORDER_STATUS_ENUM_MAP.get(storeRetryRecord.getUpdateStatus());
if (Objects.isNull(storeOrderStatusEnum) || Objects.isNull(storeOrderStatusEnum.getBeanClass())) {
return PddResponseDTO.error("请检查result参数是否正确", storeRetryRecord);
}

// 查询失败订单
List<ActivityRecordDO> activityRecordList = activityRecordMapper.query(ActivityRecordQuery
.builder()
.userId(storeRetryRecord.getOpenId())
.outOrderId(storeRetryRecord.getOutOrderId())
.result(storeRetryRecord.getUpdateStatus())
.build());
if (CollectionUtils.isEmpty(activityRecordList)) {
LogPrintUtil.printStoreError(log, StoreConst.STORE_TASK_LOG, "StoreFailedOrderRetryJob", "process",
null, null, "查询不到指定失败订单",
null, JSONObject.toJSONString(storeRetryRecord), null, null);
return PddResponseDTO.error("暂无失败订单", storeRetryRecord);
}

// 业务处理,找到状态对应的业务策略
FailedOrderStrategy failedOrderStrategy = applicationContext.getBean(storeOrderStatusEnum.getBeanClass());
failedOrderStrategy.doDealFailedOrder(activityRecordList);
return PddResponseDTO.success(storeRetryRecord);
}
}

5.在定时任务中使用工厂对象

在定时任务中查询数据库,根据失败订单的状态字段,选择不同的工厂对象,执行对于的业务策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Component
@Slf4j(topic = "third")
public class StoreFailedOrderRetryJob extends JavaProcessor {

@Autowired
private StoreRetryRecordMapper storeRetryRecordMapper;

@Autowired
private FailedOrderFactory pddFailedOrderFactory;

@Override
public ProcessResult process(JobContext context) throws Exception {
try {
String params = context.getJobParameters();
StoreRetryRecordQuery storeRetryRecordQuery;
if (StringUtils.isNotBlank(params)) {
storeRetryRecordQuery = JSONObject.parseObject(params, StoreRetryRecordQuery.class);
} else {
storeRetryRecordQuery = StoreRetryRecordQuery
.builder()
.retryTimesLimit(PddBaseConst.PDD_RETRY_TIME)
.startTime(new Date(System.currentTimeMillis() - 5 * 60 * 1000))
.pageSize(100)
.build();
}

// 查询失败订单
List<StoreRetryRecordDO> storeRetryRecordList = storeRetryRecordMapper.select(storeRetryRecordQuery);
if (CollectionUtils.isEmpty(storeRetryRecordList)) {
// 日志打印
LogPrintUtil.printStoreInfo(log, StoreConst.STORE_TASK_LOG, "StoreFailedOrderRetryJob", "process",
null, null, "定时任务查询无失败订单",
JSONObject.toJSONString(storeRetryRecordQuery), null);
return new ProcessResult(true);
}
for (StoreRetryRecordDO storeRetryRecord : storeRetryRecordList) {
// 调用失败处理逻辑
pddFailedOrderFactory.doDealFailedOrder(storeRetryRecord);
storeRetryRecordMapper.addRetryTimes(storeRetryRecord.getId(), 1);
}
return new ProcessResult(true);
} catch (Exception e) {
LogPrintUtil.printStoreError(log, StoreConst.STORE_TASK_LOG, "StoreFailedOrderRetryJob", "process",
null, null, "定时任务异常",
null, null, null, null);
}
return new ProcessResult(true);
}
}