设为首页 加入收藏

TOP

通过源码分析RocketMQ主从复制原理(一)
2023-07-25 21:40:35 】 浏览:95
Tags:通过源 RocketMQ 从复制

作者:京东物流 宫丙来

一、主从复制概述

  • RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。

  • CommitLog的消息复制是发生在消息写入时,当消息写完Broker Master时,会通过单独的线程,将消息写入到从服务器,在写入的时候支持同步写入、异步写入两种方式。

  • Broker元数据的写入,则是Broker从服务器通过单独的线程每隔10s从主Broker上获取,然后更新从的配置,并持久化到相应的配置文件中。

  • RocketMQ主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但可以提供消息读取。

二、CommitLog消息复制

2.1、整体概述

CommitLog主从复制的流程如下:

1.Producer发送消息到Broker Master,Broker进行消息存储,并调用handleHA进行主从同步;
2.如果是同步复制的话,参考2.6章节的同步复制;
3.如果是异步复制的话,流程如下:

1. Broker Master启动,并在指定端口监听;
2. Broker Slave启动,主动连接Broker Master,通过Java NIO建立TCP连接;
3.  Broker Slave以每隔5s的间隔时间向服务端拉取消息,如果是第一次拉取的话,先获取本地CommitLog文件中最大的偏移量,以该偏移量向服务端拉取消息
4.  Broker Master 解析请求,并返回数据给Broker Slave;
5.Broker Slave收到一批消息后,将消息写入本地CommitLog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;

我们先看下异步复制的整体流程,最后再看下同步复制的流程,异步复制的入口为HAService.start();

public void start() throws Exception {
 //broker master启动,接收slave请求,并处理
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
 //同步复制线程启动
    this.groupTransferService.start();
 //broker slave启动
    this.haClient.start();
}

下面分别对上面的每一步做详细说明。

2.2、HAService Master启动

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

在beginAccept方法中主要创建了ServerSocketChannel、Selector、设置TCP reuseAddress、绑定监听端口、设置为非阻塞模式,并注册OP_ACCEPT(连接事件)。可以看到在这里是通过Java原生的NIO来实现的,并没有通过Netty框架来实现。

acceptSocketService.start()启动方法代码如下:

while (!this.isStopped()) {
    try {
   //获取事件
        this.selector.select(1000);
        Set<SelectionKey> selected = this.selector.selectedKeys();
        if (selected != null) {
            for (SelectionKey k : selected) {
//处理OP_ACCEPT事件,并创建HAConnection
                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                    SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                    if (sc != null) {
                       HAConnection conn = new HAConnection(HAService.this, sc);
                       //主要是启动readSocketService,writeSocketService这两个线程
 conn.start();
                       HAService.this.addConnection(conn);
                    }
                }
            }
            selected.clear();
        }
    } catch (Exception e) {
        log.error(this.getServiceName() + " service has exception.", e);
    }
}

选择器每1s处理一次处理一次连接就绪事件。连接事件就绪后,调用ServerSocketChannel的accept()方法创建SocketChannel,与服务端数据传输的通道。然后为每一个连接创建一个HAConnection对象,该HAConnection将负责Master-Slave数据同步逻辑。HAConnection.start方法如下:

public void start() {
	this.readSocketService.start();
	this.writeSocketService.start();
}

2.3、HAClient启动

while (!this.isStopped()) {
	try {
		//和broker master建立连接,通过java nio来实现
		if (this.connectMaster()) {
			//在心跳的同时,上报offset
			if (this.isTimeToReportOffset()) {
				//上报offset
				boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
				if (!result) {
					this.closeMaster();
				}
			}
			this.selector.select(1000);
			//处理网络读请求,也就是处理从Master传回的消息数据
			boolean ok = this.processReadEvent();
			if (!ok) {
				this.closeMaster();
			}
			if (!reportSlaveMaxOffsetPlus()) {
				continue;
			}
			long interval =
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 1/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Proxyless Mesh 在 Dubbo 中的实践 下一篇三台服务器使用docker搭建redis一..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目