;
其中 addresses
是集群的ip端口列表。例如 "192.168.1.1:5672,192.168.1.2:5672,192.168.1.3:5672"
发送消息确认机制
RabbitMQ 发送消息确认机制是:生产者到交换机确认、交换机到对列确认
消息发送过程是 生产者 -> 交换机 -> 对列。消息丢失可能发生在生产者到交换机之间,也可能发生在交换机发队列之间。为保证消息可靠发送,需要在上述链路进行消息确认。
确认机制配置
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); // 发送到交换机确认
connectionFactory.setPublisherReturns(true); // 发送到对列确认
确认消息回调处理
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true); // 设置确认消息回调
template.setMessageConverter(jsonConverter());
// 发送到交换机确认消息回调
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("confirm message error! correlationData: {}, cause: {}", correlationData, cause);
}
});
// 发送到对列确认消息回调
template.setReturnsCallback((msg) -> {
String content = new String(msg.getMessage().getBody(), StandardCharsets.UTF_8);
String message = MessageFormatter.format("send msg error, routing key:{}, msg:{}", msg.getRoutingKey(), content).getMessage();
System.out.println(message);
});
此处对于异常消息仅进行打印处理,生产上需要告警、重试等策略。