这里我们可以来看看这个connect的事件回调方法做了什么事情吧:
//连接建立的事件回调,其实也有可能是连接超时
public void connect_event (){
boolean err = false;
SocketChannel fd = null;
try {
fd = connect (); //获取已经建立好连接的channel
} catch (ConnectException e) {
err = true;
} catch (SocketException e) {
err = true;
} catch (SocketTimeoutException e) {
err = true;
} catch (IOException e) {
throw new ZError.IOException(e);
}
io_object.rm_fd (handle); //可以将当前的IOObject从poller上面移除了,同时代表这个TCPConnector也就失效了,
handle_valid = false;
if (err) {
// Handle the error condition by attempt to reconnect.
close ();
add_reconnect_timer(); //尝试重新建立连接
return;
}
handle = null;
try {
Utils.tune_tcp_socket (fd);
Utils.tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
} catch (SocketException e) {
throw new RuntimeException(e);
}
// Create the engine object for this connection.
//创建streamEngine对象,重新封装建立好连接的channel
StreamEngine engine = null;
try {
engine = new StreamEngine (fd, options, endpoint);
} catch (ZError.InstantiationException e) {
socket.event_connect_delayed (endpoint, -1);
return;
}
// Attach the engine to the corresponding session object.
send_attach (session, engine); //将这个engine与session绑定起来,然后同时还会将当前streamEngine绑定到IO线程上,也就是在poller上面注册
// Shut the connecter down.
terminate (); //关闭当前的connector
socket.event_connected (endpoint, fd); //向上层的socket通知连接建立的消息
}
具体干了什么代码很直白的就能看出来吧,这里还可以看到对于建立连接超时也会进行尝试重连接的。。。
好了,到这里如何建立连接就算是比较的清楚了。。那么接下来看看在连接断开之后将会如何进行重连接吧,先来看看连接断开之后会执行啥操作,
这里首先总得知道如何判断底层的channel的连接是不是已经断开了吧,如何来判断呢,嗯,这个有点基础的就应该知道,如果连接已经断开了,那么在channel上read将会返回-1,好了那么我们就知道代码应该从哪里开始看了,嗯,来看streamEngine的in_event方法,看它在read返回-1之后会做啥:
//当底层的chanel有数据可以读取的时候的回调方法
public void in_event () {
if (handshaking)
if (!handshake ())
return;
assert (decoder != null);
boolean disconnection = false;
// If there's no data to process in the buffer...
if (insize == 0) { //如果inbuf里面没有数据需要处理
// Retrieve the buffer and read as much data as possible.
// Note that buffer can be arbitrarily large. However, we assume
// the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited.
inbuf = decoder.get_buffer (); //从解码器里面获取buf,用于写入读取的数据,因为在已经设置了底层socket的TCP接收缓冲区的大小
insize = read (inbuf); //用于将发送过来的数据写到buf中去,并记录大小
inbuf.flip(); //这里准备从buf里面读取数据了
// Check whether the peer has closed the connection.
if (insize == -1) { //如果是-1的话,表示底层的socket连接已经出现了问题
insize = 0;
disconnection = true; //设置标志位
}
}
// Push the data to the decoder.
int processed = decoder.process_buffer (inbuf, insize); //解析这些读取到的数据
if (processed == -1) {
disconnection = true;
} else {
// Stop polling for input if we got stuck.
if (processed < insize) //如果处理的数据居然还没有读到的数据多,那么取消读取事件的注册
io_object.reset_pollin (handle);
// Adjust the buffer.
insize -= processed; //还剩下没有处理的数据的大小
}
// Flush all messages the decoder may have produced.
session.flush (); //将decoder解析出来的数据交给session
// An input error has occurred. If the last decoded message
// has already been accepted, we terminate the engine immediately.
// Otherwise, we stop waiting for socket events and postpone
// the termination until after the message is accepted.
if (disconnection) { //表示已经断开了连接,那么需要处理一下
if (decoder.stalled ()) {
io_object.rm_fd (handle);
io_enabled = false;
} else {
error ();
}
}
}
嗯,这里可以看到,如果返回-1之后,会设置disconnection标志位,然后还会调用error方法来报错,那么接下来来看看这个error方法做了啥吧:
//报错,那么让高层的ZMQ的socket关闭当前连接
private void error () {
assert (session != null);
socket.event_disconnected (endpoint, handle); //这里可以理解为通知上层的socket,
session.detach (); //这个主要是用于session清理与socket的pipe ,然后还会尝试进行重连接
unplug (); //取消在poller上面的注册
destroy (); //关闭底层的channel,关闭当前
}
其实,如果底层的链接断开了,那么当前这个channel也就无效了,那么当前的streamEngine对象也就无效了,那么要做的事情就是销毁当前的对象,然后还要解除在poller上面的注册,然后还要通知上层的socket,当前的这个链接地址的连接已经断开了。。。当然还要告诉