order.queue.key", json.getInt("id"));
}
}
这里插个题外话,关于幂等的处理,我这里大致有两种思路
1、比如根据订单号查一下记录是否存在,存在就直接返回成功。
2、redis存一个唯一的请求号,处理完再删除,不存在请求号的直接返回成功,可以写个AOP去处理,与业务隔离。
言归正传,上游服务消息监听,下游发送MQ消息,更新本地事务消息为已处理,分布式事务流程结束。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/13 18:29
* @description
*/
@Component
@Slf4j
@RabbitListener(queues = "trans.update.order.queue")
public class OrderListener {
@Autowired
private NoticeMessageMapper noticeMessageMapper;
@RabbitHandler
public void updateOrder(Integer msgId) {
log.info("监听消息,更新本地事务消息,消息id:{}", msgId);
NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
noticeMessageMapper.updateById(msg);
}
}
存在异常情况时,会通过定时任务,轮询的往MQ中发送消息,尽最大努力去让下游服务达到数据一致,当然重试也要设置上限;若达到上限以后还一直是失败,那不得不考虑是下游服务自身存在问题了(有可能就是代码逻辑存在问题)。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/14 10:25
* @description
*/
@Configuration
@EnableScheduling
@AllArgsConstructor
@Slf4j
public class RetryOrderJob {
private final RabbitTemplate rabbitTemplate;
private final NoticeMessageMapper noticeMessageMapper;
/**
* 最大自动重试次数
*/
private final Integer MAX_RETRY_COUNT = 5;
@Scheduled(cron = "0/20 * * * * ? ")
public void retry() {
log.info("定时任务,重试异常订单");
LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
wrapper.eq(NoticeMessage::getStatus, 1);
List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
for (NoticeMessage noticeMessage : noticeMessages) {
// 重新发送mq消息
rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
// 重试次数+1
noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
noticeMessageMapper.updateById(noticeMessage);
// 判断重试次数,等于最长限制次数,直接更新为报警状态
if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {
noticeMessage.setStatus(3);
noticeMessageMapper.updateById(noticeMessage);
// 发送告警,通知对应人员
// 告警逻辑(短信、邮件、企微群,等等)....
}
}
}
}
其实这里有个问题,一个上游服务对应多个下游服务的时候。这个时候往往不能存一条本地消息记录。
- 这里可以在消息表多加个字段next_server_count,表示一个订单发起方,需要调用的下游服务数量。上游服务监听的时候,每次会与下游的回调都减去1,直到数值是0的时候,再更新状态是已处理。但是要控制并发,这个字段是被多个下游服务共享的。
- 还有一种处理方案是为每个下游服务,都记录一条事务消息,用type字段去区分,标记类型。实现上游和下游对于事务消息的一对一关系。
- 最后,达到最大重试次数以后,可以将消息加入到一个告警列表,这个告警列表可以展示在管理后台或其他监控系统中,展示一些必要的信息,去供公司内部人员去人工介入,处理这种异常的数据,使得数据达到最终一致性。
四、总结
其实分布式事务没有一个完美的处理方案,只能说是尽量去满足业务需求,满足数据一致。如果程序不能处理了,最后由人工去兜底,做数据的补偿方案。
五、参考源码
编程文档:
https://gitee.com/cicadasmile/butte-java-note
应用仓库:
https://gitee.com/cicadasmile/butte-flyer-parent