手动指定你的电脑可以访问到的服务器ip。
我的虚拟机的ip是192.168.3.158,所以就指定为192.168.3.158,如下
brokerIP1 = 192.168.3.158
brokerIP2 = 192.168.3.158
开启自动创建Topic
autoCreateTopicEnable = true
如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的
启动Broker
nohup sh bin/mqbroker -c conf/broker.conf &
-c 参数就是指定配置文件
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
当看到如下日志就说明启动成功了
关闭Broker
查看Broker 与NameServer是否运行
说明Broker与NameServer是运行状态
四、搭建可视化控制台
其实前面NameServer和Broker搭建完成之后,就可以用来收发消息了,但是为了更加直观,可以搭一套可视化的服务。
可视化服务其实就是一个jar包,启动就行了。
jar包可以从这获取
链接:https://pan.baidu.com/s/16s1qwY2qzE2bxR81t5Wm6w
提取码:s0sd
将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了方便,因为rocketmq的东西都在这里
然后进入/usr/rocketmq下,执行如下命名
nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &
rocketmq.config.namesrvAddr就是用来指定NameServer的地址的
查看日志
tail -f ~/logs/consolelogs/rocketmq-console.log
当看到如下日志,就说明启动成功了
然后在浏览器中输入http://linux服务器的ip:8088/就可以看到控制台了,如果无法访问,可以看看防火墙有没有关闭
通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。
功能很多,这里就不一一介绍了。
停止命令
查看进程
1.jps
- -q:只输出进程 ID
- -m:输出传入 main 方法的参数
- -l:输出完全的包名,应用主类名,jar的完全路径名
- -v:输出jvm参数
- -V:输出通过flag文件传递到JVM中的参数
2.ps aux | grep java 来获取java进程 id
结束进程
kill pid 或者(kill -9 pid)
- pid: jar包进程号
- kill pid: 结束进程,有局限性,例如后台进程,守护进程等,不能结束
- kill - 9 pid : 表示强制杀死该进程;
测试
环境搭好之后,就可以进行测试了。
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
生产者发送消息
@Test
public void sendTest() throws Exception{
//创建一个生产者,指定生产者组为ldProducer
DefaultMQProducer producer = new DefaultMQProducer("ldProducer");
// 指定NameServer的地址
producer.setNamesrvAddr("192.168.3.158:9876");
// 第一次发送可能会超时,我设置的比较大
producer.setSendMsgTimeout(60000);
// 启动生产者
producer.start();
// 创建一条消息
// topic为 ldTopic
// 消息内容为 java学习日记
// tags 为 TagA
Message msg = new Message("ldTopic", "TagA", "java学习日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
- 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为ldProducer;
- 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
- producer.start() 启动生产者
- 构建一个内容为三友的java日记的消息,然后指定这个消息往ldTopic这个topic发送
- producer.send(msg):发送消息,打印结果
- 关闭生产者
消费者消费消息
public class ConsumerMsg {
public static void main(String[] args) throws Exception {
// 通过push模式消费消息,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ldConsumer");
consumer.setNamesrvAddr("192.168.3.158:9876");
// 订阅这个topic下的所有的消息
consumer.subscribe("ldTopic", "*");
// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
}
retu