ZeroMQ(java)之I/O线程的实现与组件间的通信(一)

2014-11-24 08:20:05 · 作者: · 浏览: 0

算是开始读ZeroMQ(java)的代码实现了吧,现在有了一个大体的了解,看起来实现是比较的干净的,抽象什么的不算复杂。。。

这里先来看看它的I/O线程的实现吧,顺带看看是如何实现组件的通信的。。。。

首先要搞清楚I/O线程的实现,就先要弄懂一个类型,Poller(zmq.Poller.java),可以将其看成是对selector的一个封装,同时它还要管理定时事件,看了这么多代码,发现基本上都是在实现I/Oselect的地方完成了定时的实现。。。。

好了,不说太多闲话了,来看看它的继承体系吧:

\


这里还将依赖关系也标出来了,首先继承自PollerBase抽象类,然后实现了Runnable接口,自己还会创建一个Thread对象。。。看了这个图,基本上就已经能够知道Poller的运行原理了吧。。。。

这里先来看看PollerBase的实现吧,它其实主要是用来管理定时的,那么先来看看他的一些重要的属性和定义:

    private final AtomicInteger load;   //这个load其实就是当前poller里面注册的channel的数量
    
    //这里是要注册的超时是事件
    private final class TimerInfo {
        IPollEvents sink;  //事件回调
        int id;
        
        public TimerInfo(IPollEvents sink_, int id_) {
            sink = sink_;
            id = id_;
        }
    }
    private final Map
  
    timers;   //这里记录所有的超时对象,key是时间
    private final Map
   
     addingTimers; //等待加入的超时事件
   
  

前面的一个原子Integer是用于记录负载的,用于记录当前poller里面一共注册了多少I/O对象。。。然后是超时事件的定义,sink是超时的事件回调函数,里面有相应的方法,timer就记录了所有的超时事件,addingTimers是需要加入的超时事件。。这里的key都是超时的时间,value就是超时对象了。。。

这里就来看两个主要的方法就好了吧,先来看看如何加入超时事件:

    //添加一个超时事件
    public void add_timer (long timeout_, IPollEvents sink_, int id_) {
        long expiration = Clock.now_ms () + timeout_;   //计算超时的时间
        TimerInfo info = new TimerInfo(sink_, id_);  //创建超时对象
        addingTimers.put(expiration, info);  //将其添加到adding里面去

    }

代码应该很简单能够看明白吧,第一个参数是超时时间,第二个参数是回调方法,第三个参数是ID,首先加上当前的时间就算出了超时的时间,然后创建超时对象,这里先是将其放入了addingTimers里面,而不是直接放到了timer里面,。。。

那么接下来来看看如何执行所有的超时的方法吧:

 //执行所有的超时事件,返回下一个超时还剩下的时间
    protected long execute_timers() {
        if (!addingTimers.isEmpty()) {  //如果当前还有需要添的超时时间,那么需要将其添加进去
            timers.putAll(addingTimers);
            addingTimers.clear();
        }
        //没有超时事件
        if (timers.isEmpty())
            return 0L;

        //获取当前的时间
        long current = Clock.now_ms ();

        //遍历所有的超时时间,这里是从最小的开始的
        Iterator
  
   > it = timers.entrySet().iterator();
        while (it.hasNext()) {

            Entry 
   
     o = it.next(); // If we have to wait to execute the item, same will be true about // all the following items (multimap is sorted). Thus we can stop // checking the subsequent timers and return the time to wait for // the next timer (at least 1ms). //如果超时的时间大于当前的时间,那么表示还没有超时, if (o.getKey() > current) { return o.getKey() - current; //返回下一个超时还剩下的时间 } // Trigger the timer. //执行超时方法 o.getValue().sink.timer_event (o.getValue().id); // Remove it from the list of active timers. it.remove(); } if (!addingTimers.isEmpty()) return execute_timers(); // There are no more timers. return 0L; //如果是0 的话,表示没有timer执行了 } } 
   
  

应该代码也还算比较好理解吧,这里可以看到将addingTimers里面的都放到了timers里面。。。然后遍历所有的超时对象,并执行他们的超时回调,知道一个超时时间还没有到,最后返回的是下一个超时事件还剩下多长的时间。。。

好了,那么接下来来看看Poller类型的实现吧,先来看看它的重要定义:

	//在当前poller里面注册的封装。。。
    private static class PollSet {
        protected IPollEvents handler;   //事件的回调
        protected SelectionKey key;   //注册之后的key
        protected int ops;    //注册的事件
        protected boolean cancelled;   //是否已经取消
        
        protected PollSet(IPollEvents handler) {
            this.handler = handler;
            key = null;
            cancelled = false;
            ops = 0;
        }
    }
    final private Map
  
    fd_table;   //记录所有的注册,key是channel

    //  If true, there's at least one retired event source.
    private boolean retired;    //当前注册的对象是否有更新,如果有更新的话,在执行select之前需要先更新注册

    //  If true, thread is in the process of shutting down.
    volatile private boolean stopping;    //如果是true的话,那么执行线程将会停止
    volatile private boolean stopped;   //是否已经停止
    
    private Thread worker;   //worker线程
    private Selector selector;   //selector
    final private String name;   //名字
  

这里显示定义了一个嵌套类,所有需要注册到selector上的channel都会先构建这个对象,将其当做附件