ZeroMQ(java)中连接建立与重连机制(三)

2014-11-24 07:32:15 · 作者: · 浏览: 2
e对象来包装这个socketchannel,然后再将这个streamEngine对象与刚刚的session对象关联起来。。

这里我们可以来看看这个connect的事件回调方法做了什么事情吧:

    //连接建立的事件回调,其实也有可能是连接超时
    public void connect_event (){
        boolean err = false;
        SocketChannel fd = null;
        try {
            fd = connect ();   //获取已经建立好连接的channel
        } catch (ConnectException e) {
            err = true;
        } catch (SocketException e) {
            err = true;
        } catch (SocketTimeoutException e) {
            err = true;
        } catch (IOException e) {
            throw new ZError.IOException(e);
        }

        io_object.rm_fd (handle);  //可以将当前的IOObject从poller上面移除了,同时代表这个TCPConnector也就失效了,
        handle_valid = false;
        
        if (err) {
            //  Handle the error condition by attempt to reconnect.
            close ();   
            add_reconnect_timer();  //尝试重新建立连接
            return;
        }
        
        handle = null;
        
        try {
            
            Utils.tune_tcp_socket (fd);
            Utils.tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }

        //  Create the engine object for this connection.
        
        //创建streamEngine对象,重新封装建立好连接的channel 
        StreamEngine engine = null;
        try {
            engine = new StreamEngine (fd, options, endpoint);
        } catch (ZError.InstantiationException e) {
            socket.event_connect_delayed (endpoint, -1);
            return;
        }

        //  Attach the engine to the corresponding session object.
        send_attach (session, engine);  //将这个engine与session绑定起来,然后同时还会将当前streamEngine绑定到IO线程上,也就是在poller上面注册

        //  Shut the connecter down.
        terminate ();  //关闭当前的connector

        socket.event_connected (endpoint, fd);  //向上层的socket通知连接建立的消息
    }

具体干了什么代码很直白的就能看出来吧,这里还可以看到对于建立连接超时也会进行尝试重连接的。。。

好了,到这里如何建立连接就算是比较的清楚了。。那么接下来看看在连接断开之后将会如何进行重连接吧,先来看看连接断开之后会执行啥操作,

这里首先总得知道如何判断底层的channel的连接是不是已经断开了吧,如何来判断呢,嗯,这个有点基础的就应该知道,如果连接已经断开了,那么在channel上read将会返回-1,好了那么我们就知道代码应该从哪里开始看了,嗯,来看streamEngine的in_event方法,看它在read返回-1之后会做啥:

    //当底层的chanel有数据可以读取的时候的回调方法
    public void in_event ()  {
        if (handshaking)
            if (!handshake ())
                return;
        
        assert (decoder != null);
        boolean disconnection = false;

        //  If there's no data to process in the buffer...
        if (insize == 0) {  //如果inbuf里面没有数据需要处理

            //  Retrieve the buffer and read as much data as possible.
            //  Note that buffer can be arbitrarily large. However, we assume
            //  the underlying TCP layer has fixed buffer size and thus the
            //  number of bytes read will be always limited.
            inbuf = decoder.get_buffer ();  //从解码器里面获取buf,用于写入读取的数据,因为在已经设置了底层socket的TCP接收缓冲区的大小
            insize = read (inbuf);  //用于将发送过来的数据写到buf中去,并记录大小
            inbuf.flip();  //这里准备从buf里面读取数据了

            //  Check whether the peer has closed the connection.
            if (insize == -1) {  //如果是-1的话,表示底层的socket连接已经出现了问题
                insize = 0;
                disconnection = true;  //设置标志位
            }
        }

        //  Push the data to the decoder.
        int processed = decoder.process_buffer (inbuf, insize);  //解析这些读取到的数据

        if (processed == -1) {
            disconnection = true;
        } else {

            //  Stop polling for input if we got stuck.
            if (processed < insize)  //如果处理的数据居然还没有读到的数据多,那么取消读取事件的注册
                io_object.reset_pollin (handle);

            //  Adjust the buffer.
            insize -= processed;  //还剩下没有处理的数据的大小
        }

        //  Flush all messages the decoder may have produced.
        session.flush ();  //将decoder解析出来的数据交给session

        //  An input error has occurred. If the last decoded message
        //  has already been accepted, we terminate the engine immediately.
        //  Otherwise, we stop waiting for socket events and postpone
        //  the termination until after the message is accepted.
        if (disconnection) {   //表示已经断开了连接,那么需要处理一下
            if (decoder.stalled ()) {
                io_object.rm_fd (handle);
                io_enabled = false;
            } else {
                error ();
        
            }
        }
    }

嗯,这里可以看到,如果返回-1之后,会设置disconnection标志位,然后还会调用error方法来报错,那么接下来来看看这个error方法做了啥吧:

    //报错,那么让高层的ZMQ的socket关闭当前连接
    private void error ()  {
        assert (session != null);
        socket.event_disconnected (endpoint, handle);  //这里可以理解为通知上层的socket,
        session.detach ();   //这个主要是用于session清理与socket的pipe ,然后还会尝试进行重连接
        unplug ();  //取消在poller上面的注册
        destroy ();  //关闭底层的channel,关闭当前
    }

其实,如果底层的链接断开了,那么当前这个channel也就无效了,那么当前的streamEngine对象也就无效了,那么要做的事情就是销毁当前的对象,然后还要解除在poller上面的注册,然后还要通知上层的socket,当前的这个链接地址的连接已经断开了。。。当然还要告诉