@Override
public void run() {
try {
//10s 进行一次主从同步
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
3.1、syncTopicConfig
//从Master获取TopicConfig信息,最终调用的是AdminBrokerProcessor.getAllTopicConfig
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
.assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable());
//将topicConfig进行持久化,对应的文件为topics.json
this.brokerController.getTopicConfigManager().persist();
log.info("Update slave topic config from master, {}", masterAddrBak)
3.2、syncConsumerOffset
//从"主Broker"获取ConsumerOffset
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
//设置从的offsetTable
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
//并持久化到从的consumerOffset.json文件中
this.brokerController.getConsumerOffsetManager().persist();
3.3、syncDelayOffset
String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
String fileName = StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
.getMessageStoreConfig().getStorePathRootDir());
MixAll.string2File(delayOffset, fileName);
3.4、syncSubscriptionGroupConfig
SubscriptionGroupWrapper subscriptionWrapper =this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak);
SubscriptionGroupManager subscriptionGroupManager =this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());
subscriptionGroupManager.getSubscriptionGroupTable().clear();
subscriptionGroupManager.getSubscriptionGroupTable().putAll(subscriptionWrapper.getSubscriptionGroupTable());
subscriptionGroupManager.persist();
四、思考与收获
通过上面的分享,我们基本上了解了RocketMQ的主从复制原理,其中有些思想我们可以后续借鉴下:
-
在功能设计的时候将元数据、程序数据分开管理;
-
主从复制的时候,基本思想都是从请求主,请求时带上offset,然后主查询数据返回从,从再执行;mysql的主从复制、redis的主从复制基本也是这样;
-
主从复制包括异步复制、同步复制两种方式,可以通过配置来决定使用哪种同步方式,这个需要根据实际业务场景来决定;
-
主从复制线程尽量和消息写线程或者主线程分开;
由于时间、精力有限,难免会有纰漏、考虑不到之处,如有问题欢迎沟通、交流。