设为首页 加入收藏

TOP

Java SpringBoot集成RabbitMQ实战和总结(三)
2018-10-10 04:11:04 】 浏览:1390
Tags:Java SpringBoot 集成 RabbitMQ 战和 总结
og.info("param:{msg3 = [" + msg + "]} info:");
}


}
注解将消息和消息头注入消费者方法
在上面也看到了@Payload等注解用于注入消息。这些注解有:


@Header 注入消息头的单个属性
@Payload 注入消息体到一个JavaBean中
@Headers 注入所有消息头到一个Map中
这里有一点主要注意,如果是com.rabbitmq.client.Channel,org.springframework.amqp.core.Message和org.springframework.messaging.Message这些类型,可以不加注解,直接可以注入。
如果不是这些类型,那么不加注解的参数将会被当做消息体。不能多于一个消息体。如下方法ExampleEvent就是默认的消息体:


public void process2(@Headers Map<String, Object> headers,ExampleEvent msg);
关于消费者确认
RabbitMq消费者可以选择手动和自动确认两种模式,如果是自动,消息已到达队列,RabbitMq对无脑的将消息抛给消费者,一旦发送成功,他会认为消费者已经成功接收,在RabbitMq内部就把消息给删除了。另外一种就是手动模式,手动模式需要消费者对每条消息进行确认(也可以批量确认),RabbitMq发送完消息之后,会进入到一个待确认(unacked)的队列,如下图红框部分:


如果消费者发送了ack,RabbitMq将会把这条消息从待确认中删除。如果是nack并且指明不要重新入队列,那么该消息也会删除。但是如果是nack且指明了重新入队列那么这条消息将会入队列,然后重新发送给消费者,被重新投递的消息消息头amqp_redelivered属性会被设置成true,客户端可以依靠这点来判断消息是否被确认,可以好好利用这一点,如果每次都重新回队列会导致同一消息不停的被发送和拒绝。消费者在确认消息之前和RabbitMq失去了连接那么消息也会被重新投递。所以手动确认模式很大程度上提高可靠性。自动模式的消息可以提高吞吐量。


spring手动确认消息需要将SimpleRabbitListenerContainerFactory设置为手动模式:


    simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);


手动确认的消费者代码如下:


@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.CONFIRM_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.CONFIRM_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.CONFIRM_KEY),
containerFactory = "containerWithConfirm")
public void process(ExampleEvent msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag,
@Header("amqp_redelivered") boolean redelivered, @Headers Map<String, String> head) {
try {
log.info("ConsumerWithConfirm receive message:{},header:{}", msg, head);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("consume confirm error!", e);
//这一步千万不要忘记,不会会导致消息未确认,消息到达连接的qos之后便不能再接收新消息
//一般重试肯定的有次数,这里简单的根据是否已经重发过来来决定重发。第二个参数表示是否重新分发
channel.basicReject(deliveryTag, !redelivered);
//这个方法我知道的是比上面多一个批量确认的参数
// channel.basicNack(deliveryTag, false,!redelivered);
}
}
关于spring的AcknowledgeMode需要说明,他一共有三种模式:NONE,MANUAL,AUTO,默认是AUTO模式。这比RabbitMq原生多了一种。这一点很容易混淆,这里的NONE对应其实就是RabbitMq的自动确认,MANUAL是手动。而AUTO其实也是手动模式,只不过是Spring的一层封装,他根据你方法执行的结果自动帮你发送ack和nack。如果方法未抛出异常,则发送ack。如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队列。如果抛出异常时AmqpRejectAndDontRequeueException则发送nack不会重新入队列。我有一个例子专门测试NONE,见CunsumerWithNoneTest。


还有一点需要注意的是消费者有一个参数prefetch,它表示的是一个Channel(也就是SimpleMessageListenerContainer的一个线程)预取的消息数量,这个参数只会在手动确认的消费者才生效。可以客户端利用这个参数来提高性能和做流量控制。如果prefetch设置的是10,当这个Channel上unacked的消息数量到达10条时,RabbitMq便不会在向你发送消息,客户端如果处理的慢,便可以延迟确认在方法消息的接收。至于提高性能就非常容易理解,因为这个是批量获取消息,如果客户端处理的很快便不用一个一个去等着去新的消息。SpringAMQP2.0开始默认是250,这个参数应该已经足够了。注意之前的版本默认值是1所以有必要重新设置一下值。当然这个值也不能设置的太大,RabbitMq是通过round robin这个策略来做负载均衡的,如果设置的太大会导致消息不多时一下子积压到一台消费者,不能很好的均衡负载。另外如果消息数据量很大也应该适当减小这个值,这个值过大会导致客户端内存占用问题。如果你用到了事务的话也需要考虑这个值的影响,因为事务的用处不大,所以我也没做过多的深究。


关于发送者确认模式
考虑这样一个场景:你发送了一个消息给Rabbit

首页 上一页 1 2 3 4 5 6 下一页 尾页 3/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java程序语言的后门-反射机制 下一篇Java Bean与Map之间相互转化的实现

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目