设为首页 加入收藏

TOP

ChannelPipeline----贯穿io事件处理的大动脉(一)
2019-09-17 15:31:55 】 浏览:41
Tags:ChannelPipeline---- 贯穿 事件 处理 大动脉

ChannelPipeline贯穿io事件处理的大动脉

上一篇,我们分析了NioEventLoop及其相关类的主干逻辑代码,我们知道netty采用线程封闭的方式来避免多线程之间的资源竞争,最大限度地减少并发问题,减少锁的使用,因而能够有效减低线程切换的开销,减少cpu的使用时间。此外,我们还简单分析了netty对于线程组的封装EventLoopGroup,目前一般采用roundRobin的方式在多个线程上均匀地分配channel。通过前面几篇文章的分析,我们已经对channel的初始化,注册到EventLoop上,SingleThreadEventLoop的线程启动过程以及线程中运行的代码逻辑有了一些了解,此外我们也分析了用于处理基于TCP协议的io事件的NioEventLoop类的具体的循环逻辑,通过对代码的详细分析,我们了解了对于connect,write,read,accept事件的不同处理逻辑,但是对于write和read事件的处理逻辑我们并没有分析的很详细,因为这些事件的处理涉及到netty中另一个很重要的模块,ChannelPipeline以及一系列相关的类如Channel, ChannelHandler, ChannelhandlerContext等的理解,netty中的事件处理采用了经典的责任链(responsbility chain)的的设计模式,这种设计模式使得netty的io事件处理框架易于扩展,并且为业务逻辑提供了一个很好的抽象模型,大大降低了netty的使用难度,使得io事件的处理变得更符合思维习惯。
好了,废话了那么多,其实主要是想把前面分析的几篇的文章做一个小结和回顾,然后引出本篇的主题--netty的io事件处理链模式。
因为netty的代码结构相对来说还是很规整,它的模块之间的边界划分比较明确,EventLoop作为io事件的“发源地”,与其交互的对象是Channel类,而ChannelPipeline,ChannelhandlerContext, ChannelHandler等几个类则是与Channel交互,他们并不直接与EventLoop交互。

ChannelPipeline的结构图

首先每一个Channel在初始化的时候就会创建一个ChannelPipeline,这点我们在前面分析NioSocketChannel的初始化时也分析到了。目前ChannelPipeline的实现只有DefaultChannelPipeline一种,所以我们也以DefaultChannelPipeline来分析。DefaultChannelPipeline内部有一个双向链表结构,这个链表的每个节点都是一个AbstractChannelHandlerContext类型的节点,DefaultChannelPipeline刚初始化时就会创建两个初始节点,分别是HeadContext和TailContext,这两个节点也并不完全是标记节点,他们都有各自实际的作用,

  • HeadContext,实现了bind,connect,disconnect,close,write,flush等等几个方法,基本都是通过直接调用unsafe的相关方法实现的。而对于其他的方法基本都是通过调用AbstractChannelHandlerContext的fire方法将事件传给下一个节点。
  • TailContext, 主要用于处理写数据几乎没有实现任何逻辑,它的功能几乎全部继承自AbstractChannelHandlerContext,而AbstractChannelHandlerContext对于大部分事件处理的实现都是简单地将事件向下一个节点传递。注意,这里下一个节点不一定是前一个还是后一个,要根据具体事件类型或者具体的操作而定,对于ChannelOutboundInvoker接口中的方法都是从尾节点向首节点传递事件,而对于ChannelInboundInvoker接口中的方法都是从首节点往尾节点传递。我们可以形象地理解为,首节点是最靠近socket的,而尾节点是最原理socket的,所以有数据进来时,产生的读事件最先从首节点开始向后传递,当有写数据的动作时,则会从尾节点向头结点传递。

下面,我们以两个最重要的事件读事件和写事件,来分析netty的这种链式处理结构到底是怎么运转的。

读事件

首先,我们需要找到一个产生读事件并调用相关方法使得读事件开始传递的例子,很自然我们应该想到在EventLoop中会产生读事件。
如下,就是NioEventLoop中对于读事件的处理,通过调用NioUnsafe.read方法

       // 处理read和accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }

我们继续看NioByteUnsafe.read方法,这个方法我们之前在分析NioEventLoop事件处理逻辑时提到过,这个方法首先会通过缓冲分配器分配一个缓冲,然后从channel(也就是socket)中将数据读到缓冲中,每读一个缓冲,就会触发一个读事件,我们看具体的触发读事件的调用:

            do {
                // 分配一个缓冲
                byteBuf = allocHandle.allocate(allocator);
                // 将通道的数据读取到缓冲中
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                // 如果没有读取到数据,说明通道中没有待读取的数据了,
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    // 因为没读取到数据,所以应该释放缓冲
                    byteBuf.release();
                    byteBuf = null;
                    // 如果读取到的数据量是负数,说明通道已经关闭了
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }

                // 更新Handle内部的簿记量
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 向channel的处理器流水线中触发一个事件,
                // 让取到的数据能够被流水线上的各个ChannelHandler处理
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
                // 这里根据如下条件判断是否继续读:
                // 上一次读取到的数据量大于0,并且读取到的数据量等于分配的缓冲的最大容量,
                // 此时说明通道中还有待读取的数据
            } while (allocHandle.continueReading());

为了代码逻辑的完整性,我这里把整个循环的代码都贴上来,其实我们要关注的仅仅是pipeline.fireChannelRead(byteBuf)这一句,好了,现在我们找到Channel

首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇如何开发优质的 Flutter App:应.. 下一篇Kubernetes

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目