ZeroMQ(java)中连接建立与重连机制(二)

2014-11-24 07:32:15 · 作者: · 浏览: 3
ssion,第一参数是当前session将会依附的IO线程,第二个参数表示需要主动建立连接 SessionBase session = SessionBase.create (io_thread, true, this, options, paddr); assert (session != null); // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. boolean icanhasall = false; if (protocol.equals(pgm) || protocol.equals(epgm)) icanhasall = true; //创建pipe的关联,连接session与当前的socket if (options.delay_attach_on_connect != 1 || icanhasall) { // Create a bi-directional pipe. ZObject[] parents = {this, session}; Pipe[] pipes = {null, null}; int[] hwms = {options.sndhwm, options.rcvhwm}; boolean[] delays = {options.delay_on_disconnect, options.delay_on_close}; Pipe.pipepair (parents, pipes, hwms, delays); // Attach local end of the pipe to the socket object. //将第一个pipe与当前socket关联 attach_pipe (pipes [0], icanhasall); // Attach remote end of the pipe to the session object later on. //将另外一个pipe与session关联起来,这样session与socket就能够通过pipe通信了 session.attach_pipe (pipes [1]); } // Save last endpoint URI options.last_endpoint = paddr.toString (); add_endpoint (addr_, session); //将这个session与这个地址关联起来 return true; }

这里就主要关注TCP连接的建立部分吧,毕竟在分布式的环境下还是再用TCP,通过前面的文章,我们知道一个Socket下面可能对应了多个连接,而每一个连接其实对应的是一个StreamEngine对象,而每一个StreamEngine对象又都关联了一个Session对象,用于与上层的Socket之间的交互,那么这里其实可以看到代码最主要要做的事情就是创建Session对象,以及Pipe对象啥的。。。。接着再调用add_endpoint方法,用于部署这个session,那么接下来来看看这个方法吧:

    //这里管理地址与session,其实也就记录当前所有的建立连接的地址,以及相对的session
    private void add_endpoint (String addr_, Own endpoint_) {
        //  Activate the session. Make it a child of this socket.
        launch_child (endpoint_);   //部署这个endpoint,这里主要的是要将这个endpoint加入到IO线程
        endpoints.put (addr_, endpoint_);
    }
    

这里其实用于不是session对象,那么对于这个session对象,将会执行process_plug方法,那么来看看这个方法的定义:

    //执行plug命令,如果需要连接的话,那么要开始进行连接
    protected void process_plug () {
        io_object.set_handler(this);  //设置io对象的handler,用于响应io事件
        if (connect) {  //如果这里需要主动与远程建立连接的话,那么启动连接
            start_connecting (false);   //启动连接,false表示不等待
        }
    }

这里首先会设置当前io对象的事件回调,connect属性,在创建session的时候设置的,如果是主动创建的连接那么将会是true,如果是listener接收到的连接,那么将会是false,这里来看看这个方法的定义:

    //如果是connect的话,那么需要调用这个方法来建立连接
    private void start_connecting (boolean wait_) {
        assert (connect);

        //  Choose I/O thread to run connecter in. Given that we are already
        //  running in an I/O thread, there must be at least one available.
        IOThread io_thread = choose_io_thread (options.affinity);  //挑选一个io线程,用于部署待会的TCPConnector
        assert (io_thread != null);

        //  Create the connecter object.

        if (addr.protocol().equals(tcp)) {
            TcpConnecter connecter = new TcpConnecter (
                io_thread, this, options, addr, wait_);
            //alloc_assert (connecter);
            launch_child (connecter);  //部署这个TCPconnector
            return;
        }
        
        if (addr.protocol().equals(ipc)) {
            IpcConnecter connecter = new IpcConnecter (
                io_thread, this, options, addr, wait_);
            //alloc_assert (connecter);
            launch_child (connecter);
            return;
        }
        
        assert (false);
    }

这里传进来了一个参数,这个参数在构建TCPConnector的时候将会被用到,用于表示这个连接的建立是否是延迟的。。这里刚开始建立连接的时候,是false,表示不要延迟,待会看重连接的时候会发现,在重连接中将会使用延迟的连接。。。

这里也可以看到对于具体连接的建立,其实是委托给了TCPConnector对象来做的,它其实是一个工具类。。。

具体它是怎么建立连接的就不详细的列出来了,大概的说一下过程吧:

(1)创建一个socketchannel对象,并将其设置为非阻塞的,然后调用connect方法来建立于远程地址的连接

(2)将socketchannel注册到IO线程的poller上去,并要设置connect事件

(3)对于connect事件的回调要做的事情,其实是在poller对象上解除这个socketchannel的注册,然后创建一个新的streamengin