设为首页 加入收藏

TOP

RocketMQ 源码学习 4 : 消息发送(二)
2018-06-05 08:53:01 】 浏览:457
Tags:RocketMQ 源码 学习 消息 发送
lag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); //TODO,暂不知道这个字段是干嘛用的 requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch);

最后用这些 header 字段,以及 message body 构建 RemotingCommand,通过 remoting 模块发给 broker.

4. 处理结果

  • 发送成功:直接返回发送结果
  • 发送失败:如果 enable retryAnotherBrokerWhenNotStoreOK,就会重试,默认重试两次(retryTimesWhenSendFailed)。否则直接返回结果
  • 发送异常:Producer 对异常做了很好的区分,如果是 Remoting 和 Client 模块的异常,就重试,如果是 Broker 模块的异常,根据不同的 response code 做不同的处理,有的重试,有的抛出异常,有的返回结果。

3. Broker端,消息的处理和落地


如图,Broker 有很多 Processor 用来处理不同类型的请求,有些 Processor 会共用一个 Processor 线程池。对于消息发送,Broker 的 remoting 模块在接收到请求后,根据request code,最终会交给 SendMessageProcessor 来处理。SendMessageProcessor 会依次做以下处理:

  • 做一些校验,包括但不限于
    1. broker 是否可写
    2. topic 配置是否存在,如果不存在就新建一个(createTopicInSendMessageMethod)
    3. 校验 queueId 是否超过指定大小
  • 构建 MessageExtBrokerInner
  • 将 MessageExtBrokerInner 交给 Store 处理
  • 处理 Store 返回的结果,BrokerStatsManager 做一些统计更新,设置 Response 中的一些字段并返回。

Store 收到消息后,会先做一些校验,然后交给 commitLog 去 put,然后做些统计并返回。Store 存储消息的过程比较复杂,后面会单独分析。

4. 其他

1. 顺序消息
很多应用并不关注消息顺序,而且消息没有顺序并不代表消息内容没有顺序,合理的系统设计可以避免顺序问题。MQ 要保证消息顺序必然会损失性能、增加系统实现复杂度。具体的分析可以看 分布式开放消息系统(RocketMQ)的原理与实践

在 RocketMQ 里, 在发送消息的时候可以自己定义 MessageQueueSelector,对于同一个订单ID(或其他ID)的不同消息,可以让它走同一个 MessageQueue,这样就可以按顺序发给同一个 Broker 了。

2. Batch Message
Producer 的 API 还支持一次发多个消息。 

List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));

producer.send(messages);

Client 模块会将 Message List 封装成 MessageBatch,且会标记 requestHeader 的 batch 标志位为 true. Broker 在接收到消息后就可以根据这个标志位去做不同的处理。

5. Reference

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇JDK 源码阅读 : FileDescriptor 下一篇深入Spring Boot :怎样排查 java..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目