这里就主要关注TCP连接的建立部分吧,毕竟在分布式的环境下还是再用TCP,通过前面的文章,我们知道一个Socket下面可能对应了多个连接,而每一个连接其实对应的是一个StreamEngine对象,而每一个StreamEngine对象又都关联了一个Session对象,用于与上层的Socket之间的交互,那么这里其实可以看到代码最主要要做的事情就是创建Session对象,以及Pipe对象啥的。。。。接着再调用add_endpoint方法,用于部署这个session,那么接下来来看看这个方法吧:
//这里管理地址与session,其实也就记录当前所有的建立连接的地址,以及相对的session
private void add_endpoint (String addr_, Own endpoint_) {
// Activate the session. Make it a child of this socket.
launch_child (endpoint_); //部署这个endpoint,这里主要的是要将这个endpoint加入到IO线程
endpoints.put (addr_, endpoint_);
}
这里其实用于不是session对象,那么对于这个session对象,将会执行process_plug方法,那么来看看这个方法的定义:
//执行plug命令,如果需要连接的话,那么要开始进行连接
protected void process_plug () {
io_object.set_handler(this); //设置io对象的handler,用于响应io事件
if (connect) { //如果这里需要主动与远程建立连接的话,那么启动连接
start_connecting (false); //启动连接,false表示不等待
}
}
这里首先会设置当前io对象的事件回调,connect属性,在创建session的时候设置的,如果是主动创建的连接那么将会是true,如果是listener接收到的连接,那么将会是false,这里来看看这个方法的定义:
//如果是connect的话,那么需要调用这个方法来建立连接
private void start_connecting (boolean wait_) {
assert (connect);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
IOThread io_thread = choose_io_thread (options.affinity); //挑选一个io线程,用于部署待会的TCPConnector
assert (io_thread != null);
// Create the connecter object.
if (addr.protocol().equals(tcp)) {
TcpConnecter connecter = new TcpConnecter (
io_thread, this, options, addr, wait_);
//alloc_assert (connecter);
launch_child (connecter); //部署这个TCPconnector
return;
}
if (addr.protocol().equals(ipc)) {
IpcConnecter connecter = new IpcConnecter (
io_thread, this, options, addr, wait_);
//alloc_assert (connecter);
launch_child (connecter);
return;
}
assert (false);
}
这里传进来了一个参数,这个参数在构建TCPConnector的时候将会被用到,用于表示这个连接的建立是否是延迟的。。这里刚开始建立连接的时候,是false,表示不要延迟,待会看重连接的时候会发现,在重连接中将会使用延迟的连接。。。
这里也可以看到对于具体连接的建立,其实是委托给了TCPConnector对象来做的,它其实是一个工具类。。。
具体它是怎么建立连接的就不详细的列出来了,大概的说一下过程吧:
(1)创建一个socketchannel对象,并将其设置为非阻塞的,然后调用connect方法来建立于远程地址的连接
(2)将socketchannel注册到IO线程的poller上去,并要设置connect事件
(3)对于connect事件的回调要做的事情,其实是在poller对象上解除这个socketchannel的注册,然后创建一个新的streamengin