mer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("recv message:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
//等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}
运行结果:
recv message:Hello World!
再次查看RabbitMQ管理界面,可以看到此时消息已经变为了0:
2 Spring Boot连接
2.1 maven依赖
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 application.properties文件
#RabbitMQ配置
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.3 配置类代码
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* RabbitMQ配置类
* @author Robert Hou
* @date 2019年7月1日
*/
@Configuration
public class RabbitMQConfig {
/**
* EXCHANGE名称
*/
public static final String FANOUT_EXCHANGE = "test.fanout";
public static final String DIRECT_EXCHANGE = "test.direct";
public static final String TOPIC_EXCHANGE = "test.topic";
/**
* QUEUE名称
*/
public static final String FANOUT_QUEUE = "test.fanout.queue";
public static final String DIRECT_QUEUE = "test.direct.queue";
public static final String TOPIC_QUEUE = "test.topic.queue";
/**
* ROUTINGKEY名称
*/
public static final String DIRECT_ROUTINGKEY = "direct";
public static final String TOPIC_ROUTINGKEY = "topic.#";
@Bean
public ConnectionFactory connectionFactory(Environment environment) {
String addresses = environment.getProperty("spring.rabbitmq.addresses");
int port = environment.getProperty("spring.rabbitmq.port", Integer.class);
String username = environment.getProperty("spring.rabbitmq.username");
String password = environment.getProperty("spring.rabbitmq.password");
String virtualHost