设为首页 加入收藏

TOP

剥开比原看代码07:比原节点收到“请求区块数据”的信息后如何应答?(一)
2018-10-19 15:51:09 】 浏览:115
Tags:代码 节点 收到 请求 区块 数据 信息 如何 应答

作者:freewind

比原项目仓库:

Github地址:https://github.com/Bytom/bytom

Gitee地址:https://gitee.com/BytomBlockchain/bytom

在上一篇,我们知道了比原是如何把“请求区块数据”的信息BlockRequestMessage发送给peer节点的,那么本文研究的重点就是,当peer节点收到了这个信息,它将如何应答?

那么这个问题如果细分的话,也可以分为三个小问题:

  1. 比原节点是如何收到对方发过来的信息的?
  2. 收到BlockRequestMessage后,将会给对方发送什么样的信息?
  3. 这个信息是如何发送出去的?

我们先从第一个小问题开始。

比原节点是如何接收对方发过来的信息的?

如果我们在代码中搜索BlockRequestMessage,会发现只有在ProtocolReactor.Receive方法中针对该信息进行了应答。那么问题的关键就是,比原是如何接收对方发过来的信息,并且把它转交给ProtocolReactor.Receive的。

如果我们对前一篇《比原是如何把请求区块数据的信息发出去的》有印象的话,会记得比原在发送信息时,最后会把信息写入到MConnection.bufWriter中;与之相应的,MConnection还有一个bufReader,用于读取数据,它也是与net.Conn绑定在一起的:

p2p/connection.go#L114-L118

func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
    mconn := &MConnection{
        conn:        conn,
        bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
        bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),

(其中minReadBufferSize的值为常量1024

所以,要读取对方发来的信息,一定会读取bufReader。经过简单的搜索,我们发现,它也是在MConnection.Start中启动的:

p2p/connection.go#L152-L159

func (c *MConnection) OnStart() error {
    // ...
    go c.sendRoutine()
    go c.recvRoutine()
    // ...
}

其中的c.recvRoutine()就是我们本次所关注的。它上面的c.sendRoutine是用来发送的,是前一篇文章中我们关注的重点。

继续c.recvRoutine()

p2p/connection.go#L403-L502

func (c *MConnection) recvRoutine() {
    // ...
    for {
        c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)

        // ...

        pktType := wire.ReadByte(c.bufReader, &n, &err)
        c.recvMonitor.Update(int(n))
        // ...

        switch pktType {
        // ...
        case packetTypeMsg:
            pkt, n, err := msgPacket{}, int(0), error(nil)
            wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
            c.recvMonitor.Update(int(n))
            // ...
            channel, ok := c.channelsIdx[pkt.ChannelID]
            // ...
            msgBytes, err := channel.recvMsgPacket(pkt)
            // ...
            if msgBytes != nil {
                // ...
                c.onReceive(pkt.ChannelID, msgBytes)
            }
            // ...
        }
    }
    // ...
}

经过简化以后,这个方法分成了三块内容:

  1. 第一块就限制接收速率,以防止恶意结点突然发送大量数据把节点撑死。跟发送一样,它的限制是500K/s
  2. 第二块是从c.bufReader中读取出下一个数据包的类型。它的值目前有三个,两个跟心跳有关:packetTypePingpacketTypePong,另一个表示是正常的信息数据类型packetTypeMsg,也是我们需要关注的
  3. 第三块就是继续从c.bufReader中读取出完整的数据包,然后根据它的ChannelID找到相应的channel去处理它。ChannelID有两个值,分别是BlockchainChannelPexChannel,我们目前只需要关注前者即可,它对应的reactor是ProtocolReactor。当最后调用c.onReceive(pkt.ChannelID, msgBytes)时,读取的二进制数据msgBytes就会被ProtocolReactor.Receive处理

我们的重点是看第三块内容。首先是channel.recvMsgPacket(pkt),即通道是怎么从packet包里读取到相应的二进制数据的呢?

p2p/connection.go#L667-L682

func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
    // ...
    ch.recving = append(ch.recving, packet.Bytes...)
    if packet.EOF == byte(0x01) {
        msgBytes := ch.recving
        // ...
        ch.recving = ch.recving[:0]
        return msgBytes, nil
    }
    return nil, nil
}

这个方法我去掉了一些错误检查和关于性能方面的注释,有兴趣的同学可以点接上方的源代码查看,这里就忽略了。

这段代码主要是利用了一个叫recving的通道,把packet中持有的字节数组加到它后面,然后再判断该packet是否代表整个信息结束了,如果是的话,则把ch.recving的内容完整返回,供调用者处理;否则的话,返回一个nil,表示还没拿完,暂时处理不了。在前一篇文章中关于发送数据的地方可以与这里对应,只不过发送方要麻烦的多,需要三个通道sendQueuesendingsend才能实现,这边接收方就简单了。

然后回到前面的方法MConnection.recvRoutine,我们继续看最后的c.onReceive调用。这个onReceive实际上是一个由别人赋值给该channel的一个函数,它位于MConnection创建的地方:

p2p/peer

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Go 跨域请求问题 下一篇剥开比原看代码03:比原是如何监..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目