然后是fd_table,这个应该知道是干嘛用的吧,用于关联注册的channel对象与其的PollSet对象。。。
这里的retired用于标识当前的注册的channel什么的是否有更新。。。接下来的重要属性还有thread,这个是干嘛应该很清楚吧,还有一个selector就不多说了。。。
接下来来看看如何在poller对象上面注册channel吧,有几个比较重要的方法:
//用于在当前的集合里面添加需要注册的channel,第一个参数是channel,第二个参数是事件回调
public final void add_fd (SelectableChannel fd_, IPollEvents events_) {
fd_table.put(fd_, new PollSet(events_)); //直接把放到map里面就好了
adjust_load (1); //增加load值,这里所谓的负载其实就是在当前poller里面注册的channel的数量
}
//在key上面注册事件,如果negate为true的话,那么表示是取消事件
private final void register (SelectableChannel handle_, int ops, boolean negate) {
PollSet pollset = fd_table.get(handle_); //获取pollset对象
if (negate) {
pollset.ops = pollset.ops &~ ops; //取反,相当于取消事件
} else {
pollset.ops = pollset.ops | ops; //注册事件
}
if (pollset.key != null) { //如果有key了,那么表示已经注册到selector上面了,那么只需要更新key就好了
pollset.key.interestOps(pollset.ops);
} else {
retired = true;
}
}
这里首先需要调用add_fd方法,channel加入进去,然后再调用register方法注册相应的事件,不知道为啥要这么弄。。直接一个方法实现不就好了么。。可能有一些细节的东西我还不太清楚吧,不多说这个了。。
好了,接下来来看看它的run方法吧:
//poller的执行流程
public void run () {
int returnsImmediately = 0;
while (!stopping) {
long timeout = execute_timers (); //执行所有的超时,并且获取下一个超时的时间
if (retired) { //这里表示注册的东西有更新
Iterator
> it = fd_table.entrySet ().iterator ();
while (it.hasNext ()) { //遍历所有需要注册的
Map.Entry
entry = it.next (); SelectableChannel ch = entry.getKey (); //获取channel PollSet pollset = entry.getValue (); //获取pollset if (pollset.key == null) { //这里没有key的话,表示当前channel并没有注册到selector上面去 try { pollset.key = ch.register(selector, pollset.ops, pollset.handler); //注册,这里注册的附件居然是事件的回调函数 } catch (ClosedChannelException e) { } } if (pollset.cancelled || !ch.isOpen()) { //如果是取消注册,那么直接取消掉就可以了 if(pollset.key != null) { pollset.key.cancel(); } it.remove (); } } retired = false; } // Wait for events. int rc; long start = System.currentTimeMillis (); //select之前的时间 try { rc = selector.select (timeout); } catch (IOException e) { throw new ZError.IOException (e); } if (rc == 0) { //出错啦,好像 // Guess JDK epoll bug if (timeout == 0 || System.currentTimeMillis () - start < timeout / 2) returnsImmediately ++; else returnsImmediately = 0; if (returnsImmediately > 10) { rebuildSelector (); //重建selector returnsImmediately = 0; } continue; } Iterator
it = selector.selectedKeys().iterator(); //所有select出来的key while (it.hasNext()) { //遍历 SelectionKey key = it.next(); IPollEvents evt = (IPollEvents) key.attachment(); it.remove(); try { //接下来就是判断事件的类型执行相应的方法就好了 if (key.isReadable() ) { //有数据可以读取了 evt.in_event(); } else if (key.isAcceptable()) { //有新的连接进来了 evt.accept_event(); } else if (key.isConnectable()) { //连接建立 evt.connect_event(); } if (key.isWritable()) { //可写 evt.out_event(); } } catch (CancelledKeyException e) { // channel might have been closed } } } stopped = true; }
这个应该很容易看懂吧,首先执行了所有超时的事件,然后如果有注册的channel更新的话,需要重新更新这些注册,然后就可以执行select方法了,接着遍历出所有select的key,然后判断事件的类型,执行相应的回调方法就好了。。。
最后来看看它的start方法:
//启动,这里主要是创建一个线程,然后开始运行
public void start() {
worker = new Thread(this, name); //创建thread,
worker.start(); //启动这个执行线程
}
好吧,简单吧,创建一个线程,然后启动就好了,这里执行的就是run方法。。。。
好了,到这里整个poller的实现和其运行基本上就算是搞清楚了。。。而且可以知道poller对象才是真的I/O线程的持有者。。。。
接下来来介绍另外一个类型:Mailbox,每一个I/O线程都会有自己的mailbox,而且连接也会有自己的mailbox,可以向mailbox里面发送命令,然后让其执行。。。这里可以理解为mailbox是命令的接收器,ZeroMQ就是用这个来实现组件之间的通信的。。。。
先来看看他的一些重要的属性定义吧:
private final YPipecpipe; //这名字太唬人了,其实就是一