1、 RocketMQ安装测试
1.1 下载解压
下载地址:https://rocketmq.apache.org/release-notes/
rocketmq-all-5.0.0-bin-release.zip
下载后上传到服务器;
解压命令# unzip rocketmq-all-5.0.0-bin-release.zip
1.2 启动 测试
RocketMQ默认配置是比较好的,这样可以直接应用于生产环境,所以如果机器内存较小,启动会因为内存不足失败,为了避免后面启动失败,选择先修改其内存大小,一般阿里云服务器是满足不了默认内存。
手动调整JVM的配置,单位从g改为m
1.2.1 启动nameserver
1.2.1.1 修改runbroker.sh和runserver.sh
1.2.1.2 runbroker.sh
-server -Xms256m -Xmx256m -Xmn128m
1.2.1.3 runserver.sh
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
1.2.1.4 启动nameserver
解压目录执行# nohup ./bin/mqnamesrv -n 1.117.75.57(自己的ip):9876 &
1.2.2 启动broker
1.2.2.1 修改broker.conf
添加namesrvAddr 和 brokerIP1:
1.2.2.3 启动 borker
解压目录执行# nohup ./bin/mqbroker -n 1.117.75.57:9876 -c ./conf/broker.conf &
1.2.2.4 查看启动情况
jps
1.2.3 测试
由于服务器内存可能比较小,建议先关闭其他应用,比如rabbitmq,docker的容器等;
还需要开启几个端口:9876,10909,10910,10911;
1.2.3.1 生产者
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
发送消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
1.2.3.2 消费者
导出环境变量# export NAMESRV_ADDR=1.117.75.57:9876
消费消息# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
1.2.4 关闭命令
关闭nameserver# bin/mqshutdown namesrv
关闭broker# bin/mqshutdown broker
1.2.5 RocketMQ控制台
1.2.5.1 下载,解压,修改配置信息
1.2.5.2 访问控制台
localhost:9696
2、RocketMQ框架原理
2.1 框架
2.2 概念
整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer:
- Broker(邮递员):Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能;
- NameServer(邮局):消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息Producer(寄件人)消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息;
- Consumer(收件人) :消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息;
- Topic(地区):用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息Message Queue(邮件)为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息;
- Message:Message 是消息的载体;
- Producer Group:生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
- Consumer Group:消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。
3、RocketMQ整合
3.1 rocketmq模块 发送消息
3.2.1.1 依赖
<!-- rocket -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
3.2.1.2 配置
# rocketmq配置
rocketmq:
#rocketMQ服务的地址
name-server: 1.117.75.57:9876
# 生产者组
producer:
group: kh96-sendsms-group
3.2.1.3 请求
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
/**
* @param : [phoneNo]
* @return : java.lang.String
* @author : huayu
* @date : 30/11/2022
* @description : 测试发送消息到用户中心,用户中心给手机号发信息
*/
@RequestMapping("/testRocketMQSendMsg")
public String testRocketMQSendMsg(@RequestParam String phoneNo) {
log.info("------ 使用RocketMQ,测试给手机:{},发送消息 -------", phoneNo);
//使用RocketMQ发送消息
rocketMQTemplate.convertAndSend("rocketmq-send-sms-kh96", phoneNo);
return "send msg success";
}
3.2 user模块 消费消息
1.添加加rocketmq的依赖;
2.用户服务,监听发送短信的请求发送消息:
/**
* Created On : 30/11/2022.
* <p>
* Author : huayu
* <p>
* Description: 用户服务,监听发送短信的请求发送消息
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "rocketmq-user-sms-group"