ZeroMQ(java)中连接建立与重连机制(一)

2014-11-24 07:32:15 · 作者: · 浏览: 4

前面的一篇文章分析了ZeroMQ中最为简单Socket类型,Dealer。。不过觉得这种具体的Socket类型的分析可以留到以后,或者等以后什么时候会用到了再分析再不迟。。。。

但是作为一个消息通信的框架,最重要的还是通信的可靠性,而这其中最最重要的就是连接断开之后的重连接机制。。。

在看具体的重连接机制之前,先来看看ZeroMQ中如何主动的建立于远程的连接吧,先来看看SocketBase中定义的connect方法:

 //与远程地址建立连接
    public boolean connect (String addr_) {
        if (ctx_terminated) {
            throw new ZError.CtxTerminatedException();
        }

        //  Process pending commands, if any.
        boolean brc = process_commands (0, false);
        if (!brc)
            return false;

        //  Parse addr_ string.
        URI uri;
        try {
            uri = new URI(addr_);   //构建URI对象
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException(e);
        }
        
        String protocol = uri.getScheme();   //获取协议类型
        String address = uri.getAuthority();
        String path = uri.getPath();
        if (address == null)
            address = path;

        check_protocol (protocol);  //检查是否是合格的协议类型

        if (protocol.equals(inproc)) {    //如果是进程内部的通信

            //  TODO: inproc connect is specific with respect to creating pipes
            //  as there's no 'reconnect' functionality implemented. Once that
            //  is in place we should follow generic pipe creation algorithm.

            //  Find the peer endpoint.
            Ctx.Endpoint peer = find_endpoint (addr_);
            if (peer.socket == null)
                return false;
            // The total HWM for an inproc connection should be the sum of
            // the binder's HWM and the connector's HWM.
            int  sndhwm = 0;
            if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
                sndhwm = options.sndhwm + peer.options.rcvhwm;
            int  rcvhwm = 0;
            if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
                rcvhwm = options.rcvhwm + peer.options.sndhwm;

            //  Create a bi-directional pipe to connect the peers.
            ZObject[] parents = {this, peer.socket};
            Pipe[] pipes = {null, null};
            int[] hwms = {sndhwm, rcvhwm};
            boolean[] delays = {options.delay_on_disconnect, options.delay_on_close};
            Pipe.pipepair (parents, pipes, hwms, delays);

            //  Attach local end of the pipe to this socket object.
            attach_pipe (pipes [0]);

            //  If required, send the identity of the peer to the local socket.
            if (peer.options.recv_identity) {
                Msg id = new Msg (options.identity_size);
                id.put (options.identity, 0 , options.identity_size);
                id.set_flags (Msg.identity);
                boolean written = pipes [0].write (id);
                assert (written);
                pipes [0].flush ();
            }
            
            //  If required, send the identity of the local socket to the peer.
            if (options.recv_identity) {
                Msg id = new Msg (peer.options.identity_size);
                id.put (peer.options.identity, 0 , peer.options.identity_size);
                id.set_flags (Msg.identity);
                boolean written = pipes [1].write (id);
                assert (written);
                pipes [1].flush ();
            }

            //  Attach remote end of the pipe to the peer socket. Note that peer's
            //  seqnum was incremented in find_endpoint function. We don't need it
            //  increased here.
            send_bind (peer.socket, pipes [1], false);

            // Save last endpoint URI
            options.last_endpoint = addr_;

            // remember inproc connections for disconnect
            inprocs.put(addr_, pipes[0]);

            return true;
        }

        //选择一个比较IO线程,用于部署待会将会创建爱的session
        IOThread io_thread = choose_io_thread (options.affinity);
        if (io_thread == null) {
            throw new IllegalStateException(Empty IO Thread);
        }
        //创建address对象
        Address paddr = new Address (protocol, address);

        if (protocol.equals(tcp)) {  //如果是tcp的话
            paddr.resolved( new  TcpAddress () );
            paddr.resolved().resolve (
                address, options.ipv4only != 0   true : false);
        } else if(protocol.equals(ipc)) {  //进程间通信
            paddr.resolved( new IpcAddress () );
            paddr.resolved().resolve (address, true);
        }
        //  Create session.
        //创建se