设为首页 加入收藏

TOP

RocketMQ 学习(二)
2023-07-25 21:34:39 】 浏览:106
Tags:RocketMQ 学习
手动指定你的电脑可以访问到的服务器ip。

我的虚拟机的ip是192.168.3.158,所以就指定为192.168.3.158,如下

brokerIP1 = 192.168.3.158
brokerIP2 = 192.168.3.158

开启自动创建Topic

autoCreateTopicEnable = true

如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的

启动Broker

nohup sh bin/mqbroker -c conf/broker.conf &

-c 参数就是指定配置文件

查看日志

tail -f ~/logs/rocketmqlogs/broker.log

当看到如下日志就说明启动成功了

关闭Broker

sh bin/mqshutdown broker

查看Broker 与NameServer是否运行

jps

 说明Broker与NameServer是运行状态

四、搭建可视化控制台

其实前面NameServer和Broker搭建完成之后,就可以用来收发消息了,但是为了更加直观,可以搭一套可视化的服务。

可视化服务其实就是一个jar包,启动就行了。

jar包可以从这获取

链接:https://pan.baidu.com/s/16s1qwY2qzE2bxR81t5Wm6w
提取码:s0sd

将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了方便,因为rocketmq的东西都在这里

然后进入/usr/rocketmq下,执行如下命名

nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &

rocketmq.config.namesrvAddr就是用来指定NameServer的地址的

查看日志

tail -f ~/logs/consolelogs/rocketmq-console.log

当看到如下日志,就说明启动成功了

然后在浏览器中输入http://linux服务器的ip:8088/就可以看到控制台了,如果无法访问,可以看看防火墙有没有关闭

 

通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。

功能很多,这里就不一一介绍了。

停止命令

查看进程

1.jps

  • -q:只输出进程 ID
  • -m:输出传入 main 方法的参数
  • -l:输出完全的包名,应用主类名,jar的完全路径名
  • -v:输出jvm参数
  • -V:输出通过flag文件传递到JVM中的参数

2.ps aux | grep java 来获取java进程 id

结束进程

kill pid 或者(kill -9 pid)

  • pid: jar包进程号
  • kill pid: 结束进程,有局限性,例如后台进程,守护进程等,不能结束
  • kill - 9 pid : 表示强制杀死该进程;

 

测试

环境搭好之后,就可以进行测试了。

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

生产者发送消息

    @Test
    public void sendTest() throws Exception{
        //创建一个生产者,指定生产者组为ldProducer
        DefaultMQProducer producer = new DefaultMQProducer("ldProducer");

        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.3.158:9876");
        // 第一次发送可能会超时,我设置的比较大
        producer.setSendMsgTimeout(60000);

        // 启动生产者
        producer.start();

        // 创建一条消息
        // topic为 ldTopic
        // 消息内容为 java学习日记
        // tags 为 TagA
        Message msg = new Message("ldTopic", "TagA", "java学习日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送消息并得到消息的发送结果,然后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
  • 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为ldProducer;
  • 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
  • producer.start() 启动生产者
  • 构建一个内容为三友的java日记的消息,然后指定这个消息往ldTopic这个topic发送
  • producer.send(msg):发送消息,打印结果
  • 关闭生产者

消费者消费消息

public class ConsumerMsg {
    public static void main(String[] args) throws Exception {
        // 通过push模式消费消息,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ldConsumer");
        consumer.setNamesrvAddr("192.168.3.158:9876");
        // 订阅这个topic下的所有的消息
        consumer.subscribe("ldTopic", "*");
        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
                }

                retu
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇easyexcel实现导出添加文字水印 下一篇如何在 SpringBoot 项目中接入 Ch..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目