原文:juejin.cn/post/6998363970037874724
前言
Rabbitmq 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 实现,是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。消息传递指的是应用程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此通信,直接调用通常是指远程过程调用的技术。
核心组成
- Server:又称 Broker,接收客户端的连接,实现 AMQP 实体服务,安装 rabbitmq-server
- Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手
- Channel:网络信道,几乎所有操作都在 Channel 中进行,Channel 是进行消息读写的通道,客户端可以建立多个 Channel,每个 Channel 代表一个会话任务。
- Message:消息,服务与应用程序之间传送的数据,由 Properties 和 Body 组成,Properties 可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body 则是消息体的内容。
- Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机可以有若干个 exchange 和 queue,同一个虚拟主机里面不能有相同名称的 exchange
- Exchange:交换机,接收消息,根据路由键发送消息到绑定的队列(不具备消息存储能力)
- Bindings:exchange 和 queue 之间的虚拟连接,binding 中可以保存多个 routing key
- Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
- Queue:队列,也称为 Message Queue,消息队列,保存消息并将它们转发给消费者
Rabbitmq 消息模式
3.1 Simple 模式
Simple 模式是最简单的一个模式,由一个生产者,一个队列,一个消费者组成,生产者将消息通过交换机(此时,图中并没有交换机的概念,如不定义交换机,会使用默认的交换机)把消息存储到队列,消费者从队列中取出消息进行处理。
用 Java demo 实现此模式,推荐一个开源免费的 Spring Boot 最全教程:
https://github.com/javastacks/spring-boot-best-practice
Productor
public class Send {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、创建连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello world";
// 4、发送消息到指定队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException | IOException e) {
e.printStackTrace();
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Customer
public class Recv {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setVirtualHost("/");
// 2、获取 Connection和 Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
观察可视化界面,会看到消息先会被写入到队列中,随后又被消费者消费了。
3.2 Fanout 模式
Fanout——发布订阅模式,是一种广播机制。
此模式包括:一个生产者、一个交换机 (exchange)、多个队列、多个消费者。生产者将消息发送到交换机,交换机不存储消息,将消息存储到队列,消费者从队列中取消息。如果生产者将消息发送到没有绑定队列的交换机上