ction,返回后会在进行设置一下该ServerConnection的包头大小、最大包大小、设置连接的发送缓冲区队列、超时时间、字符编码,到此,工厂完成了新建连接的工作,返回一个连接的对象。返回后将该连接分配给一个processor,该processor会将该连接保存,processor也会对连接进行定期检查。5、 processor还会向自己的reactorR进行注册该连接,加入reactorR的处理队列,并唤醒阻塞的select()方法。
反应堆中Reactor的R线程运行代码:
| 02 |
final Selector selector = this.selector; |
| 06 |
int res = selector.select(); |
| 07 |
LOGGER.debug(reactCount + ">>NIOReactor接受连接数:" + res); |
| 09 |
Set keys = selector.selectedKeys(); |
| 11 |
for (SelectionKey key : keys) { |
| 12 |
Object att = key.attachment(); |
| 13 |
if (att != null && key.isValid()) { |
| 14 |
int readyOps = key.readyOps(); |
| 15 |
if ((readyOps & SelectionKey.OP_READ) != 0) { |
| 16 |
LOGGER.debug("select读事件"); |
| 17 |
read((NIOConnection) att); |
| 18 |
} else if ((readyOps & SelectionKey.OP_WRITE) != 0) { |
| 19 |
LOGGER.debug("select写事件"); |
| 20 |
write((NIOConnection) att); |
| 31 |
} catch (Throwable e) { |
该R线程也会一直循环运行,如果向该selector注册过的channel没有对应的感兴趣的事件发生,就会阻塞,直到有感兴趣的事件发生或被wakeup。返回后会运行register函数,将之前加入该reactor连接队列中的所有连接向该selector注册OP_READ事件。该注册的动作会调用Connection对象中的register方法进行注册
channel.register(selector, SelectionKey.OP_READ, this);
注意最后一个this指针参数,表示将该连接作为附件,注册到selector,当有感兴趣的时间发生时,函数selector.selectedKeys()返回的SelectionKey集合中的对象中使用key.attachment()即可获取到上面注册时绑定的connection对象指针附件。目的就是为了通过该附件对象调用该连接类中定义的read函数来完成功能。如下所示:
| 1 |
private void read(NIOConnection c) { |
| 4 |
} catch (Throwable e) { |
| 5 |
c.error(ErrorCode.ERR_READ, e); |
6、 连接类中定义的read函数定义在AbstractConnection类中。在该read函数(该read函数涉及到的逻辑比较复杂,先不深究)中,完成从channel中读取数据到buffer,然后从buffer中提取byte数据交给具体子类(FrontendConnection)的handle()方法进行处理。
7、 该方法会从processor的线程池中获取一个线程,来异步执行数据的处理。处理会调用成员handler的handle方法来对数据进行处理。这里,在FrontendConnection的构造函数中定handler设置为FrontendAuthenticator(进行前端认证)。
| 01 |
public void handle(final byte[] data) { |
| 04 |
processor.getHandler().execute(new Runnable() { |
| 10 |
} catch (Throwable t) { |
| 11 |
error(ErrorCode.ERR_HANDLE_DATA, t); |
8、 handler在构造函数中初始化成前端认证处理器,用于处理前端权限认证。
| 1 |
public FrontendConnection(SocketChannel channel) { |
| 5 |
this.handler = new FrontendAuthenticator(this); |
9、 由于Cobar是基于MySQL协议的,所以需要分析一下MySQL协议的具体格式。下面就先分析一下MySQL认证数据包的格式:
每个报文都分为消息头和消息体两部分,其中消息头是固定的四个字节,报文结构如下:

登录认证报文的报文数据部分格式如下:

10、 FrontendAuthenticator类对上面的数据包的具体处理如下: