Spring+ActiveMQ+Flex- 智慧公交(二)

2014-11-24 08:14:27 · 作者: · 浏览: 2
= null; // 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息) Session session = null; // 消息的目的地 Destination destination = null; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD, "udp://192.168.1.22:8123"); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 创建Topic,名字为myTopic destination = session.createTopic("myTopic"); MessageProducer message_producer = session.createProducer(destination); // 通过消息生产者发出消息 message_producer.send(session.createTextMessage(s)); } catch (JMSException e) { e.printStackTrace(); } } }

2. Receive.java

package com.test.jms.util;


 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Session;
import flex.messaging.MessageBroker;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.AcknowledgeMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.util.UUIDUtils;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import flex.messaging.endpoints.Endpoint;

public class Receive {
	
	private static ConnectionFactory connectionFactory = null;
	private static Connection connection = null;
	private static Session session = null;
	private static Destination destination = null;
    private static MessageConsumer messageConsumer = null;

	public void doReceive() {
		
		try {
		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD, "udp://192.168.1.22:8123");
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
		destination = session.createTopic("myTopic");
		if (messageConsumer == null) {
			
			messageConsumer = session.createConsumer(destination);
		    //重写MessageListener类的onMessage方法
			messageConsumer.setMessageListener(new MessageListener() {
				
				public void onMessage(Message message) {
					
					try {
					TextMessage textMessage = (TextMessage) message;
					String msg = null;
					msg = textMessage.getText();
					System.out.println(msg);
			        // 获取消息代理,此处的参数就是Spring配置文件中配置的messagebroker的id
					MessageBroker messageBroker = MessageBroker.getMessageBroker("_messageBroker");
				    String clientID = UUIDUtils.createUUID();
			        //创建AsyncMessage类的对象是为了Flex端用Messaging模式接收消息
				    AsyncMessage asynMsg = new AsyncMessage();
	                // 设置消息的地址,这个必须跟Spring配置文件中信道的destination一致
	 		    	asynMsg.setDestination("market-data-feed");
	 		    	//flex 可以通过header过滤消息
					asynMsg.setHeader("msg","new");
					asynMsg.setClientId(clientID);
			        asynMsg.setMessageId(UUIDUtils.createUUID());
					asynMsg.setTimestamp(System.currentTimeMillis());
					asynMsg.setBody(msg);
				    messageBroker.routeMessageToService(asynMsg, null)