c void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置参数
factory.setHost("192.168.3.100"); // ip 默认值 localhost
factory.setPort(5673); // 端口 默认值 5672
factory.setVirtualHost("/"); // 虚拟机 默认值 /
factory.setUsername("guest"); // 用户名 默认值 guest
factory.setPassword("guest"); // 密码 默认值 guest
// 3、创建连接 Connection
Connection connection = factory.newConnection();
// 4、创建频道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 6、创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1、queue:队列名称
2、exchange:交换机名称
3、routingKey:路由键,绑定规则
如果交换机的类型为fanout,routingkey设置为""
*/
// 队列1绑定 info warning error
channel.queueBind(queue1Name, exchangeName, "info");
channel.queueBind(queue1Name, exchangeName, "warning");
channel.queueBind(queue1Name, exchangeName, "error");
// 队列2绑定 error
channel.queueBind(queue2Name, exchangeName, "error");
// 8、发送消息
String body = "日志信息:张三调用了delete方法...出错误了...日志级别:error...";
channel.basicPublish(exchangeName, "error", null, body.getBytes());
body = "日志信息:张三调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName, "info", null, body.getBytes());
// 9、释放资源
channel.close();
connection.close();
}
}
2、消费者
消费者1
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置参数
factory.setHost("192.168.3.100"); // ip 默认值 localhost
factory.setPort(5673); // 端口 默认值 5672
factory.setVirtualHost("/"); // 虚拟机 默认值 /
factory.setUsername("guest"); // 用户名 默认值 guest
factory.setPassword("guest"); // 密码 默认值 guest
// 3、创建连接 Connection
Connection connection = factory.newConnection();
// 4、创建频道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name, true, false, false, null);
// 6、接收消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name, true, consumer);
// 7、释放资源 不需要
}
}
消费者2
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置参数
factory.setHost("192.168.3.100"); // ip 默认值 localhost
factory.setPort(5673); // 端口 默认值 5672
factory.setVirtualHost(
|