除',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tb_notice_message` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`type` tinyint(4) NOT NULL COMMENT '业务类型:1-下单',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态:1-待处理,2-已处理,3-预警',
`data` varchar(255) NOT NULL COMMENT '信息',
`retry_count` tinyint(4) DEFAULT '0' COMMENT '重试次数',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT '删除标识:0-不删除;1-删除',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
处理订单service,这里可以用到我们之前说过的装饰器模式,去装饰这个service。把保存本地事务,发送mq消息,交给装饰器类去做,而service只需要关心业务逻辑即可,也符合开闭原则。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/13 10:58
* @description
*/
@Service
@Slf4j
@AllArgsConstructor
public class OrderService implements BaseHandler<Object, Order> {
private final OrderMapper orderMapper;
/**
* 订单处理方法:只处理订单关联逻辑
* @param o
* @return
*/
@Override
public Order handle(Object o) {
// 订单信息
Order order = Order.builder()
.orderNo("2345678")
.createTime(LocalDateTime.now())
.userId(1)
.insuranceAmount(new BigDecimal(2000000))
.orderAmount(new BigDecimal(5000))
.build();
orderMapper.insert(order);
return order;
}
}
新增OrderService
的装饰类OrderServiceDecorate
,负责对订单逻辑的扩展,这里是添加本地事务消息,以及发送MQ信息,扩展方法添加了Transactional
注解,确保订单逻辑和本地事务消息的数据在同一个事务中进行,确保原子性。其中事务消息标记处理中,待下游服务处理完业务逻辑,再更新处理完成。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/14 18:48
* @description
*/
@Slf4j
@AllArgsConstructor
@Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)
public class OrderServiceDecorate extends AbstractHandler {
private final NoticeMessageMapper noticeMessageMapper;
private final RabbitTemplate rabbitTemplate;
/**
* 装饰方法:对订单处理逻辑进行扩展
* @param o
* @return
*/
@Override
@Transactional
public Object handle(Object o) {
// 调用service方法,实现保单逻辑
Order order = (Order) service.handle(o);
// 扩展:1、保存事务消息,2、发送MQ消息
// 本地事务消息
String data = "{\"orderNo\":\"2345678\", \"userId\":1, \"insuranceAmount\":2000000, \"orderAmount\":5000}";
NoticeMessage noticeMessage = NoticeMessage.builder()
.retryCount(0)
.data(data)
.status(1)
.type(1)
.createTime(LocalDateTime.now())
.build();
noticeMessageMapper.insert(noticeMessage);
// 发送mq消息
log.info("发送mq消息....");
rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
return null;
}
}
关于这个装饰者模式,之前有讲到过,可以看下之前发布的内容。
下游服务监听消息,处理完自己的业务逻辑后(如:业绩、分润、晋升等),需要发送MQ,上游服务监听消息,更新本地事务状态为已处理。这需要注意的是下游服务需要做幂等处理,防止异常情况下,上游服务数据的重试。
/**
* @author 往事如风
* @version 1.0
* @date 2022/12/13 18:07
* @description
*/
@Component
@Slf4j
@RabbitListener(queues = "trans.queue")
public class FenRunListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitHandler
public void orderHandler(String msg) {
log.info("监听到订单消息:{}", msg);
// 需要注意幂等,幂等逻辑
log.info("下游服务业务逻辑。。。。。");
JSONObject json = JSONUtil.parseObj(msg);
rabbitTemplate.convertAndSend("trans", "trans.update.