设为首页 加入收藏

TOP

RocketMQ 源码学习 4 : 消息发送(一)
2018-06-05 08:53:01 】 浏览:456
Tags:RocketMQ 源码 学习 消息 发送

1. Client端,三种发送方式

RocketMQ 支持常见的三种发送方式,

  • SYNC
producer.send(msg)

同步的发送方式,会等待发送结果后才返回。可以用 send(msg, timeout) 的方式指定等待时间,如果不指定,就是默认的 3000ms. 这个timeout 最终会被设置到 ResponseFuture 里,再发送完消息后,用 countDownLatch 去 await timeout的时间,如果过期,就会抛出异常。

  • ASYNC
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }
    @Override
    public void onException(Throwable e) {
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的 callback. 这个 API 也可以指定 Timeout,不指定也是默认的 3000ms.

  • ONEWAY
producer.sendOneway(msg);

比较简单,发出去后,什么都不管直接返回。

对于每种方式,Producer 还提供了可以指定 MessageQueue, MessageQueueSelector的API,这属于稍微高端一点的玩法,一般用它提供的默认的策略选择 MessageQueue 就可以了。

2. Client端发送过程

下面以 SYNC 方式为例,看下整个消息的发送过程,其他方式略有差异,总体流程类似。

1. 根据 Topic 找到指定的 TopicPublishInfo

先去本地 map 找,如果没有,就去 Namesrv fetch, 如果 Namesrv 里也没有,则用默认的 Topic 再去 fetch TopicRouteData. 对用用默认 Topic 的这种情况,Client 拿到数据后,会去构建 TopicPublishInfo, 然后用当前的 Topic 作为 key 放到本地 map 里。Broker 在接收到消息的时候,会去更新它本地的配置,然后在 registerBroker 的时候会去更新 namesrv 中的 TopicRouteData 信息,这样 Namesrv 中就会有这样一份配置了。当然,也可以事先在 Namesrv 增加该配置,很多公司内部都有这样定制的平台来管理MQ的接入配置。

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

public class TopicRouteData {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

QueueData 定义了这个 read 和 write 的 queue的数量,Client 在拿到 TopicRouteData 后,会根据这里配的数量去构建响应数目的messageQueue,即 messageQueueList. brokerDatas 保存了各个 broker 的相关信息。

2. 从 messageQueueList 中选择一个 MessageQueue

如果没有 enable latencyFaultTolerance,就用递增取模的方式选择。如果 enable 了,在递增取模的基础上,再过滤掉 not available 的。这里所谓的 latencyFaultTolerance, 是指对之前失败的,按一定的时间做退避:

long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

举个例子,如果上次请求的 latency 超过 550L ms, 就退避 3000L ms;超过 1000L,就退避 60000L.

以上就是 Producer 到 Broker 的简单的负载均衡。

3. 发送消息

到这一步,我们已经拿到了这些关键数据:

  • Message, 要发送的消息
  • MessageQueue,这里面包括 topic/brokerName/queueId
  • CommunicationMode, 发送方式, SYNC/ASYNC/ONEWAY
  • TopicPublishInfo

有了这些数据,就可以构建 RequestHeader 了,大部分字段意思都很明显(当然,前提是对RocketMQ的源码有所熟悉),个别字段见注释。

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
//系统Flag, 用于判断走什么逻辑。标识是否压缩,事务的不同TYPE(prepare/rollback/commit/not transaction) 等
requestHeader.setSysFlag(sysFlag); 
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息Flag, 最终会落地
requestHeader.setF
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇JDK 源码阅读 : FileDescriptor 下一篇深入Spring Boot :怎样排查 java..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目