设为首页 加入收藏

TOP

Java SpringBoot集成RabbitMQ实战和总结(四)
2018-10-10 04:11:04 】 浏览:1393
Tags:Java SpringBoot 集成 RabbitMQ 战和 总结
Mq,RabbitMq接收了但是存入磁盘之前服务器就挂了,消息也就丢了。为了保证消息的投递有两种解决方案,最保险的就是事务(和DB的事务没有太大的可比性), 但是因为事务会极大的降低性能,会导致生产者和RabbitMq之间产生同步(等待确认),这也违背了我们使用RabbitMq的初衷。所以一般很少采用,这就引入第二种方案:发送者确认模式。


发送者确认模式是指发送方发送的消息都带有一个id,RabbitMq会将消息持久化到磁盘之后通知生产者消息已经成功投递,如果因为RabbitMq内部的错误会发送ack。注意这里的发送者和RabbitMq之间是异步的,所以相较于事务机制性能大大提高。其实很多操作都是不能保证绝对的百分之一百的成功,哪怕采用了事务也是如此,可靠性和性能很多时候需要做一些取舍,想很多互联网公司吹嘘的5个9,6个9也是一样的道理。如果不是重要的消息性能计数器,完全可以不采用发送者确认模式。


这里有一点我当时纠结了很久,我一直以为发送者确认模式的回调是客户端的ack触发的,这里是大大的误解!发送者确认模式和消费者没有一点关系,消费者确认也和发送者没有一点关系,两者都是在和RabbitMq打交道,发送者不会管消费者有没有收到,只要消息到了RabbitMq并且已经持久化便会通知生产者,这个ack是RabbitMq本身发出的,和消费者无关


发送者确认模式需要将Channel设置成Confirm模式,这样才会收到通知。Spring中需要将连接设置成Confirm模式:


connectionFactory.setPublisherConfirms(isConfirm);
然后在RabbitTemplate中设置确认的回调,correlationData是消息的id,如下(只是简单打印下):


// 设置RabbitTemplate每次发送消息都会回调这个方法
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause)
        -> log.info("confirm callback id:{},ack:{},cause:{}", correlationData, ack, cause));


发送时需要给出唯一的标识(CorrelationData):


rabbitTemplateWithConfirm.convertAndSend(RabbitMQConstant.DEFAULT_EXCHANGE, RabbitMQConstant.DEFAULT_KEY,
            new ExampleEvent(i, "confirm message id:" + i),
            new CorrelationData(Integer.toString(i)));


还有一个参数需要说下:mandatory。这个参数为true表示如果发送消息到了RabbitMq,没有对应该消息的队列。那么会将消息返回给生产者,此时仍然会发送ack确认消息。


设置RabbitTemplate的回调如下:


rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)
-> log.info("return callback message:{},code:{},text:{}", message, replyCode, replyText));
另外如果是RabbitMq内部的错误,不会调用该方法。所以如果消息特别重要,对于未确认的消息,生产者应该在内存用保存着,在确认时候根据返回的id删除该消息。如果是nack可以将该消息记录专门的日志或者转发到相应处理的逻辑进行后续补偿。RabbitTemplate也可以配置RetryTemplate,发送失败时直接进行重试,具体还是要结合业务。


最后关于发送者确认需要提的是spring,因为spring默认的Bean是单例的,所以针对不同的确认方案(其实有不同的确认方案是比较合理的,很多消息不需要确认,有些需要确认)需要配置不同的bean.


消费消息、死信队列和RetryTemplate
上面也提到了如果消费者抛出异常时默认的处理逻辑。另外我们还可以给消费者配置RetryTemplate,如果是采用SpringBoot的话,可以在application.yml配置中配置如下:


spring:
rabbitmq:
listener:
retry:


重试次数


      max-attempts: 3
    #  开启重试机制
      enabled: true


如上,如果消费者失败的话会进行重试,默认是3次。注意这里的重试机制RabbitMq是为感知的!到达3次之后会抛出异常调用MessageRecoverer。默认的实现为RejectAndDontRequeueRecoverer,也就是打印异常,发送nack,不会重新入队列。
我想既然配置了重试机制消息肯定是很重要的,消息肯定不能丢,仅仅是日志可能会因为日志滚动丢失而且信息不明显,所以我们要讲消息保存下来。可以有如下这些方案:


使用RepublishMessageRecoverer这个MessageRecoverer会发送发送消息到指定队列
给队列绑定死信队列,因为默认的RepublishMessageRecoverer会发送nack并且requeue为false。这样抛出一场是这种方式和上面的结果一样都是转发到了另外一个队列。详见DeadLetterConsumer
注册自己实现的MessageRecoverer
给MessageListenerContainer设置RecoveryCallback
对于方法手动捕获异常,进行处理
我比较推荐前两种。这里说下死信队列,死信队列其实就是普通的队列,只不过一个队列声明的时候指定的属性,会将死信转发到该交换器中。声明死信队列方法如下:


@RabbitListener(
    bindings = @QueueBinding(
        exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
            durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
        value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
&n

首页 上一页 1 2 3 4 5 6 下一页 尾页 4/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java程序语言的后门-反射机制 下一篇Java Bean与Map之间相互转化的实现

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目