设为首页 加入收藏

TOP

RocketMQ 学习(三)
2023-07-25 21:34:39 】 浏览:107
Tags:RocketMQ 学习
rn
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
  • 创建一个消费者实例对象,指定消费者组为ldConsumer
  • 指定NameServer的地址:服务器的ip:9876
  • 订阅 ldTopic 这个topic的所有信息
  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。
  • 启动消费者

启动之后,消费者就会消费刚才生产者发送的消息,于是控制台就打印出如下信息

 

再去看控制台,已消费

 

SpringBoot环境下集成RocketMQ

集成

在实际项目中肯定不会像上面测试那样用,都是集成SpringBoot的。

1、引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <version>2.1.1.RELEASE</version>
</dependency>

2、yml配置

rocketmq:
  producer:
    group: ldProducer
  name-server: 192.168.3.158:9876

3、创建消费者

SpringBoot底下只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可

@Component
@RocketMQMessageListener(consumerGroup = "ldConsumer", topic = "ldDelayTaskTopic")
@Slf4j
public class LdRocketMQListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        log.info("获取到延迟任务消息:{}",msg);
    }
}

@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类

4、测试

@RestController
@Slf4j
public class RocketMQDelayTaskController {

    @Resource
    private DefaultMQProducer producer;

    @GetMapping("/rocketmq/add")
    public void addTask(@RequestParam("task") String task) throws Exception {
        Message msg = new Message("ldDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setDelayTimeLevel(2);
        // 发送消息并得到消息的发送结果,然后打印
        log.info("提交延迟任务");
        producer.send(msg);
    }

}

 

可能遇到的问题

搭完mq单主单从集群之后,美滋滋想发一下message, 没想到碰到一个坑爹的问题:

Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.90 CQ:  0.90 INDEX:  0.90, maybe your broker machine memory too small.
org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [549]ms, Topic: ldTopicA, BrokersSent: [broker-a, broker-a, broker-a]
See http://rocketmq.apache.org/docs/faq/ for further details.

    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:665)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325)
    at com.example.delay.MQTest.sendTest(MQTest.java:46)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
... Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CO
首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇easyexcel实现导出添加文字水印 下一篇如何在 SpringBoot 项目中接入 Ch..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目