作者:京东物流 宫丙来
一、主从复制概述
-
RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。
-
CommitLog的消息复制是发生在消息写入时,当消息写完Broker Master时,会通过单独的线程,将消息写入到从服务器,在写入的时候支持同步写入、异步写入两种方式。
-
Broker元数据的写入,则是Broker从服务器通过单独的线程每隔10s从主Broker上获取,然后更新从的配置,并持久化到相应的配置文件中。
-
RocketMQ主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但可以提供消息读取。
二、CommitLog消息复制
2.1、整体概述
CommitLog主从复制的流程如下:
1.Producer发送消息到Broker Master,Broker进行消息存储,并调用handleHA进行主从同步;
2.如果是同步复制的话,参考2.6章节的同步复制;
3.如果是异步复制的话,流程如下:
1. Broker Master启动,并在指定端口监听;
2. Broker Slave启动,主动连接Broker Master,通过Java NIO建立TCP连接;
3. Broker Slave以每隔5s的间隔时间向服务端拉取消息,如果是第一次拉取的话,先获取本地CommitLog文件中最大的偏移量,以该偏移量向服务端拉取消息
4. Broker Master 解析请求,并返回数据给Broker Slave;
5.Broker Slave收到一批消息后,将消息写入本地CommitLog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;
我们先看下异步复制的整体流程,最后再看下同步复制的流程,异步复制的入口为HAService.start();
public void start() throws Exception {
//broker master启动,接收slave请求,并处理
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
//同步复制线程启动
this.groupTransferService.start();
//broker slave启动
this.haClient.start();
}
下面分别对上面的每一步做详细说明。
2.2、HAService Master启动
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
在beginAccept方法中主要创建了ServerSocketChannel、Selector、设置TCP reuseAddress、绑定监听端口、设置为非阻塞模式,并注册OP_ACCEPT(连接事件)。可以看到在这里是通过Java原生的NIO来实现的,并没有通过Netty框架来实现。
acceptSocketService.start()启动方法代码如下:
while (!this.isStopped()) {
try {
//获取事件
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
//处理OP_ACCEPT事件,并创建HAConnection
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAConnection conn = new HAConnection(HAService.this, sc);
//主要是启动readSocketService,writeSocketService这两个线程
conn.start();
HAService.this.addConnection(conn);
}
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
选择器每1s处理一次处理一次连接就绪事件。连接事件就绪后,调用ServerSocketChannel的accept()方法创建SocketChannel,与服务端数据传输的通道。然后为每一个连接创建一个HAConnection对象,该HAConnection将负责Master-Slave数据同步逻辑。HAConnection.start方法如下:
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}
2.3、HAClient启动
while (!this.isStopped()) {
try {
//和broker master建立连接,通过java nio来实现
if (this.connectMaster()) {
//在心跳的同时,上报offset
if (this.isTimeToReportOffset()) {
//上报offset
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
//处理网络读请求,也就是处理从Master传回的消息数据
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =