前面的一篇文章分析了ZeroMQ中最为简单Socket类型,Dealer。。不过觉得这种具体的Socket类型的分析可以留到以后,或者等以后什么时候会用到了再分析再不迟。。。。
但是作为一个消息通信的框架,最重要的还是通信的可靠性,而这其中最最重要的就是连接断开之后的重连接机制。。。
在看具体的重连接机制之前,先来看看ZeroMQ中如何主动的建立于远程的连接吧,先来看看SocketBase中定义的connect方法:
//与远程地址建立连接
public boolean connect (String addr_) {
if (ctx_terminated) {
throw new ZError.CtxTerminatedException();
}
// Process pending commands, if any.
boolean brc = process_commands (0, false);
if (!brc)
return false;
// Parse addr_ string.
URI uri;
try {
uri = new URI(addr_); //构建URI对象
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
String protocol = uri.getScheme(); //获取协议类型
String address = uri.getAuthority();
String path = uri.getPath();
if (address == null)
address = path;
check_protocol (protocol); //检查是否是合格的协议类型
if (protocol.equals(inproc)) { //如果是进程内部的通信
// TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
// Find the peer endpoint.
Ctx.Endpoint peer = find_endpoint (addr_);
if (peer.socket == null)
return false;
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int sndhwm = 0;
if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
sndhwm = options.sndhwm + peer.options.rcvhwm;
int rcvhwm = 0;
if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
rcvhwm = options.rcvhwm + peer.options.sndhwm;
// Create a bi-directional pipe to connect the peers.
ZObject[] parents = {this, peer.socket};
Pipe[] pipes = {null, null};
int[] hwms = {sndhwm, rcvhwm};
boolean[] delays = {options.delay_on_disconnect, options.delay_on_close};
Pipe.pipepair (parents, pipes, hwms, delays);
// Attach local end of the pipe to this socket object.
attach_pipe (pipes [0]);
// If required, send the identity of the peer to the local socket.
if (peer.options.recv_identity) {
Msg id = new Msg (options.identity_size);
id.put (options.identity, 0 , options.identity_size);
id.set_flags (Msg.identity);
boolean written = pipes [0].write (id);
assert (written);
pipes [0].flush ();
}
// If required, send the identity of the local socket to the peer.
if (options.recv_identity) {
Msg id = new Msg (peer.options.identity_size);
id.put (peer.options.identity, 0 , peer.options.identity_size);
id.set_flags (Msg.identity);
boolean written = pipes [1].write (id);
assert (written);
pipes [1].flush ();
}
// Attach remote end of the pipe to the peer socket. Note that peer's
// seqnum was incremented in find_endpoint function. We don't need it
// increased here.
send_bind (peer.socket, pipes [1], false);
// Save last endpoint URI
options.last_endpoint = addr_;
// remember inproc connections for disconnect
inprocs.put(addr_, pipes[0]);
return true;
}
//选择一个比较IO线程,用于部署待会将会创建爱的session
IOThread io_thread = choose_io_thread (options.affinity);
if (io_thread == null) {
throw new IllegalStateException(Empty IO Thread);
}
//创建address对象
Address paddr = new Address (protocol, address);
if (protocol.equals(tcp)) { //如果是tcp的话
paddr.resolved( new TcpAddress () );
paddr.resolved().resolve (
address, options.ipv4only != 0 true : false);
} else if(protocol.equals(ipc)) { //进程间通信
paddr.resolved( new IpcAddress () );
paddr.resolved().resolve (address, true);
}
// Create session.
//创建se