设为首页 加入收藏

TOP

ActiveMQ结合Spring收发消息(三)
2018-10-11 16:12:59 】 浏览:456
Tags:ActiveMQ 结合 Spring 收发 消息

消息消费者服务

@Service
public class ConsumerService {
    @Autowired
    private JmsTemplate jmsTemplate;
    //从指定的Destination接收消息
    public TextMessage recive(Destination destination){
        TextMessage message = (TextMessage) jmsTemplate.receive(destination);
        try {
            System.out.println("从队列" + destination.toString() + "收到了消息" + message.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return message;
    }
    //从默认的Destination接收消息
    public void reciveDefault(){

        Destination destination = jmsTemplate.getDefaultDestination();
        jmsTemplate.setReceiveTimeout(5000);
        while(true){
            TextMessage message = (TextMessage) jmsTemplate.receive(destination);
            try {
                //这里还是同一个消费者
                System.out.println("消费者  从目的地 " + destination.toString() + " 收到了消息" + message.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

生产者

直接在 main 方法中获取 ApplicationContext 运行,便于测试。

@Component
public class MsgProducer {
    @Autowired
    private ProducerService producerService;
    public void send(){
        System.out.println("生产者开始发送消息:");
        for(int i = 1; i < 11; i++){
            String msg = "生产者发出的消息";
            producerService.sendMessageDefault(msg + "-----" + i);
        }
    }
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml");
        MsgProducer msgProducer = context.getBean(MsgProducer.class);
        msgProducer.send();
    }
}

消费者

@Component
public class MsgConsumer {
    @Autowired
    private ConsumerService consumerService;
    public void recive(){
        System.out.println("消费者 1 开始接收消息:");
        consumerService.reciveDefault();
    }
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml");
        MsgConsumer msgConsumer = context.getBean(MsgConsumer.class);
        msgConsumer.recive();
    }
}

接下来就可以启动项目。同样是使用两种方式测试。

第一种方式————点对点(Queue)

同步的方式

先启动生产者发送10条消息, 再启动消费者,可以看到控制台显示成功收到10条消息。

异步监听的方式

通过监听器即可实现异步接收消息的效果,而不是像上面使用 while() 轮询同步的方式。
项目中一般都是使用异步监听的方式,在 A 服务中发送了一条消息,B 服务可以利用消息监听器监听,当收到消息后,进行相应的操作。

消息监听器(3种)

通过继承 JMS 中的 MessageListener 接口,实现 onMessage() 方法,就可以自定义监听器。这是最基本的监听器。(可根据业务实现自定义的功能)

另外spring也给我们提供了其他类型的消息监听器,比如 SessionAwareMessageListener,它的作用不仅可以接收消息,还可以发送一条消息通知对方表示自己收到了消息。(还有一种是 MessageListenerAdapter)

一个简单的自定义监听器如下:收到消息后打印消息

public class QueueMessageListener implements MessageListener {
    public void onMessage(Message message) {
        //如果有消息
        TextMessage tmessage = (TextMessage) message;
        try {
            if(tmessage != null){
                System.out.println("监听器监听消息:"+tmessage.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

在 ActiveMQ.xml 中引入消息监听器:

<!-- 配置消息队列监听者(Queue) -->
    <bean id="queueMessageListener" class="com.service.QueueMessageListener" />

 <!-- 显示注入消息监听容器,配置连接工厂,监听的目标是QueueDestination 或 topicDestination,监听器是上面自定义的监听器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />  
		
编程开发网
首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java并发之Condition的实现分析 下一篇ERROR 1044 (42000) : Access den..