设为首页 加入收藏

TOP

分布式事务解决方案(三)
2023-07-25 21:44:05 】 浏览:80
Tags:解决方
order.queue.key", json.getInt("id")); } }

这里插个题外话,关于幂等的处理,我这里大致有两种思路
1、比如根据订单号查一下记录是否存在,存在就直接返回成功。
2、redis存一个唯一的请求号,处理完再删除,不存在请求号的直接返回成功,可以写个AOP去处理,与业务隔离。
言归正传,上游服务消息监听,下游发送MQ消息,更新本地事务消息为已处理,分布式事务流程结束。

/**
 * @author 往事如风
 * @version 1.0
 * @date 2022/12/13 18:29
 * @description
 */
@Component
@Slf4j
@RabbitListener(queues = "trans.update.order.queue")
public class OrderListener {

    @Autowired
    private NoticeMessageMapper noticeMessageMapper;

    @RabbitHandler
    public void updateOrder(Integer msgId) {
        log.info("监听消息,更新本地事务消息,消息id:{}", msgId);
        NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
        noticeMessageMapper.updateById(msg);
    }
}

存在异常情况时,会通过定时任务,轮询的往MQ中发送消息,尽最大努力去让下游服务达到数据一致,当然重试也要设置上限;若达到上限以后还一直是失败,那不得不考虑是下游服务自身存在问题了(有可能就是代码逻辑存在问题)。

/**
 * @author 往事如风
 * @version 1.0
 * @date 2022/12/14 10:25
 * @description
 */
@Configuration
@EnableScheduling
@AllArgsConstructor
@Slf4j
public class RetryOrderJob {

    private final RabbitTemplate rabbitTemplate;

    private final NoticeMessageMapper noticeMessageMapper;

    /**
     * 最大自动重试次数
     */
    private final Integer MAX_RETRY_COUNT = 5;

    @Scheduled(cron = "0/20 * * * * ? ")
    public void retry() {
        log.info("定时任务,重试异常订单");
        LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
        wrapper.eq(NoticeMessage::getStatus, 1);
        List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
        for (NoticeMessage noticeMessage : noticeMessages) {
            // 重新发送mq消息
            rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
            // 重试次数+1
            noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
            noticeMessageMapper.updateById(noticeMessage);
            // 判断重试次数,等于最长限制次数,直接更新为报警状态
            if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {
                noticeMessage.setStatus(3);
                noticeMessageMapper.updateById(noticeMessage);
                // 发送告警,通知对应人员
                // 告警逻辑(短信、邮件、企微群,等等)....
            }
        }
    }
}

其实这里有个问题,一个上游服务对应多个下游服务的时候。这个时候往往不能存一条本地消息记录。

  1. 这里可以在消息表多加个字段next_server_count,表示一个订单发起方,需要调用的下游服务数量。上游服务监听的时候,每次会与下游的回调都减去1,直到数值是0的时候,再更新状态是已处理。但是要控制并发,这个字段是被多个下游服务共享的。
  2. 还有一种处理方案是为每个下游服务,都记录一条事务消息,用type字段去区分,标记类型。实现上游和下游对于事务消息的一对一关系。
  3. 最后,达到最大重试次数以后,可以将消息加入到一个告警列表,这个告警列表可以展示在管理后台或其他监控系统中,展示一些必要的信息,去供公司内部人员去人工介入,处理这种异常的数据,使得数据达到最终一致性。

四、总结

其实分布式事务没有一个完美的处理方案,只能说是尽量去满足业务需求,满足数据一致。如果程序不能处理了,最后由人工去兜底,做数据的补偿方案。

五、参考源码

编程文档:
https://gitee.com/cicadasmile/butte-java-note

应用仓库:
https://gitee.com/cicadasmile/butte-flyer-parent
首页 上一页 1 2 3 下一页 尾页 3/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇三十分钟入门基础Go(Java小子版) 下一篇集合 P1 集合

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目