设为首页 加入收藏

TOP

Spring下ActiveMQ实战(一)
2015-11-10 13:45:00 来源: 作者: 【 】 浏览:9
Tags:Spring ActiveMQ 实战

MessageQueue是分布式的系统里经常要用到的组件,一般来说,当需要把消息跨网段、跨集群的分发出去,就可以用这个。一些典型的示例就是:


1、集群A中的消息需要发送给多个机器共享;


2、集群A中消息需要主动推送,但彼此的网络不是互通的(如集群A只有过HA才能被外界访问);



当然上面的几个点,除了用MQ还有其它实现方式,但是MQ无疑是非常适合用来做这些事的。众多MQ中,ActiveMQ是比较有名气也很稳定的,它发送消息的成本非常廉价,支持Queue与Topic两种消息机制。本文主要就是讲如何在Spring环境下配置此MQ:


1、场景假设


现有机器两台Server、Worker需要进行异步通信,另有一台ActiveMQ机器,关于MQ的配置信息存放在Zookeeper中,Zookeeper的节点有:


- /mq/activemq/ip:mq的机器ip


-/mq/activemq/port:这是mq的机器端口


2、Server的Spring XML配置


Server主要的工作就是接受Worker消息,并发送消息给Worker。主要是定义了连接MQ的连接池,接受Worker消息的队列worker,发送消息给Worker的队列server:




? ?
? ?
? ? ? ?
? ? ? ? ? ?
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?

? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ?
? ? ? ? ? ?

? ? ? ?

? ?



? ?
? ?
? ? ? ?
? ?

? ?
? ?
? ? ? ?
? ? ? ?
? ? ? ?
? ? ? ? ? ?
? ? ? ?

? ? ? ?
? ?



? ?
? ?
? ? ? ?
? ?
? ?
? ?
? ?



一段一段地分析,ActiveMQ连接池这里,定义了连接的bean为“conFactory”,其中broberURL属性是通过后台Java代码的静态方法来设置的,方便线上环境通过Java代码动态地切换,稍后会介绍这块代码,你现在需要知道的是,它实际上返回的就是一个字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,如果不要用后台来管理连接信息,直接改成“”也是OK的。


接下来,便是Worker消息队列的定义,这里定义为“taskWorkerTopic”,类型是org.apache.activemq.command.ActiveMQTopic,(订阅模式)它表示一个消息可以被多个机器收到并处理,其它的还有org.apache.activemq.command.ActiveMQQueue,(点对点模式)表示一个消息只能被一台机器收到,当收到后消息就出队列了,其它机器无法处理。它们都有一个构造参数constructor-arg,指定了消息队列的名称,一个MQ中一个消息队列的名字是唯一的。


Worker的消息队列定义好了之后,就是接受Worker的里消息了,这里定义了“taskWorkerContainer”,其属性分别定义了连接池、目标队列、消息处理器(我们自己的Java类,后面再讲),参数pubSubDomain用于指定是使用订阅模式还是使用点对点模式,如果是ActiveMQTopic则要设置为true,默认是false。


好了,Server现在已经可以通过自己定义的“lekko.mq.task.TaskWorkerListener”类接受并处理taskWorkerTopic的消息了。


如法炮制,定义一个专门用于往Worker里发消息的队列“taskServerTopic”,并定义发送消息的模板“taskServerTemplate”备用。


3、Server端的接收类与发送类


lekko.mq.task.TaskWorkerListener便是一个接收类示例:


package lekko.mq.task;


import javax.jms.Message;
import javax.jms.MessageListener;


import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import lekko.mq.model.MessageModel;



/**
?* Task消息监听类
?* @author lekko
?*/
@Service
public class TaskWorkerListener implements MessageListener {


? ? private Logger _logger = Logger.getLogger(TaskWorkerListener.class);


? ? @Override
? ? public void onMessage(Message message) {
? ? ? ? if (message instanceof ActiveMQObjectMessage) {
? ? ? ? ? ? ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? onMessage((MessageModel) aMsg.getObject());
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? _logger.warn("Message:${} is not a instance of MessageModel.", e);
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? _logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");
? ? ? ? }
? ? }


? ? /**
? ? * 处理消息
? ? * @param message 自定义消息实体
? ? */
? ? public void onMessage(MessageModel message) { ... }


}


这里给大家演示的并不是最基础的知识,处理的消息是一个自定义的类“lekko.mq.model.MessageModel”,这个类怎么写可以随便整,反正就是一些你要传递的数据字段,但是记得要实现Serializable接口。如果你需要传递的仅仅是纯字符串,那么直接在代码的23行片,把message.toString()即可。这个类通过前面XML配置会处理来自“worker_topic”队列中的消息。


再就是发送类,实际上就是把前面的taskServiceTemplate拿来用就行了:


package lekko.mq.task;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import lekko.mq.model.MessageModel;



/**
?* 服务器任务消息分发
?* @author lekko
?*/
@Service
public class TaskServerSender {


? ? @Autowi

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇GCC 库的链接顺序问题 下一篇编译安装Memcached时提示找不到GCC

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: