设为首页 加入收藏

TOP

Spring下ActiveMQ实战(二)
2015-11-10 13:45:00 来源: 作者: 【 】 浏览:10
Tags:Spring ActiveMQ 实战
red
? ? @Qualifier("taskServerTemplate")
? ? private JmsTemplate jmsTemplate;


? ? /**
? ? * 发送消息
? ? */
? ? public void sendMessage(MessageModel msg) {
? ? ? ? jmsTemplate.convertAndSend(msg);
? ? }


}


把这个类TaskServerSender注入到任意需要用到的地方,调用sendMessage方法即可。它会往前面定义的“server_topic”中塞消息,等Worker来取。


4、关于Zookeeper配置MQ连接信息


Worker端的配置我这里不再阐述,因为它跟在Server端的配置太相像,区别就在于Server端是从worker_topic中取消息,往server_topic中写消息;而Worker端的代码则是反过来,往worker_topic中写消息,从server_topic中取消息。


那么如何使用Java代码来控制ActiveMQ的配置消息呢:


package lekko.mq.util;


import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;


/**
?* 获取MQ配置
?* @author lekkoli
?*/
public class MQPropertiesFactory {
? ?
? ? private static boolean isLoaded = false;
? ? private static String ZOOKEEPER_CLUST = "xxx.xxx.xxx.xxx:2181";
? ? private static ZooKeeper _zk;
? ? private static String _ip;
? ? private static String _port;


? ? private static String getProperty(String path) throws Exception {
? ? ? ? if (_zk == null) {
? ? ? ? ? ? if (ZOOKEEPER_CLUST == null) {
? ? ? ? ? ? ? ? throw new Exception("Zookeeper, Host \"" + ZOOKEEPER_CLUST + "\" is null!");
? ? ? ? ? ? }
? ? ? ? ? ? _zk = new ZooKeeper(ZOOKEEPER_CLUST, 90000, null);
? ? ? ? }
? ? ? ? Stat s = _zk.exists(path, false);
? ? ? ? if (s != null)
? ? ? ? ? ? return new String(_zk.getData(path, false, s));
? ? ? ? throw new Exception("Zookeeper, Path \"" + path + "\" is not exist!");
? ? }


? ? private static void load() throws Exception {
? ? ? ? if (!isLoaded) {
? ? ? ? ? ? _ip = getProperty("/mq/activemq/ip");
? ? ? ? ? ? _port = getProperty("/mq/activemq/port");
? ? ? ? ? ? isLoaded = true;
? ? ? ? }
? ? }


? ? public static String getUrl() throws Exception {
? ? ? ? load();
? ? ? ? StringBuilder failover = new StringBuilder();
? ? ? ? String[] ips = _ip.split(";"), ports = _port.split(";");
? ? ? ? for (int i = 0; i < ips.length; ++i) {
? ? ? ? ? ? failover.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");
? ? ? ? }
? ? ? ? failover.setLength(failover.length() - 1);
? ? ? ? String failovers = failover.toString();
? ? ? ? if (ips.length > 1) {
? ? ? ? ? ? failovers = "failover:(" + failovers + ")";
? ? ? ? }
? ? ? ? return failovers;
? ? }
}


上面的代码需要解释的地方跟MQ相关的不多,主要就是如果是mq集群,则格式是:failover:(tcp://192.168.1.117:1001,tcp://192.168.1.118:1001,tcp://xxx.xxx.xxx.xxx:port)。其它上面代码没有对Zookeeper集群都挂了的情况,做应急连接方案。当然,无论如何本节都不是全文的重点,但是多学一技何尝不可?


推荐阅读:


首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇GCC 库的链接顺序问题 下一篇编译安装Memcached时提示找不到GCC

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: