设为首页 加入收藏

TOP

HDFS源码分析DataXceiver之读数据块
2019-04-14 12:09:34 】 浏览:62
Tags:HDFS 源码 分析 DataXceiver 数据
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lipeng_bigdata/article/details/50859223

《HDFS源码分析DataXceiver之整体流程》一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver。并且,在线程DataXceiver处理请求的主方法run()方法内,会先读取操作符op,然后根据操作符op分别调用相应的方法进行请求的处理。而决定什么样的操作符op该调用何种方法的逻辑,则是在DataXceiver线程父类Receiver的processOp()方法中实现的,代码如下:

  /** Process op by the corresponding method. */
  protected final void processOp(Op op) throws IOException {
	
	// 通过调用相应的方法处理操作符
    switch(op) {
    case READ_BLOCK:// 读数据块调用opReadBlock()方法
      opReadBlock();
      break;
    case WRITE_BLOCK:// 写数据块调用opWriteBlock()方法
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:// 替换数据块调用opReplaceBlock()方法
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:// 复制数据块调用REPLACE()方法
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:// 数据块检验调用opBlockChecksum()方法
      opBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:// 移动数据块调用opTransferBlock()方法
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }
接下来的几篇文章,我们将依次为大家介绍读数据块、写数据块、替换数据块、复制数据块、移动数据块等具体数据读写请求的处理。

那么今天,我们首先来看下第一种数据读写请求--读数据块READ_BLOCK,它是通过调用opReadBlock()方法完成的,我们先看下这个方法的代码:

  /** Receive OP_READ_BLOCK */
  private void opReadBlock() throws IOException {
    
	// 解析输入流,得到读数据块消息协议OpReadBlockProto,即proto
	OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
    
	// 创建TraceScope类型的traceScope
	TraceScope traceScope = continueTraceSpan(proto.getHeader(),
        proto.getClass().getSimpleName());
    
    try {
    	
      // 调用readBlock()方法,完成读数据块操作
      // 从读数据块消息协议OpReadBlockProto中分别获得需要读取的数据块block、访问令牌blockToken、客户端名称clientName、数据块读取的起始偏移量blockOffset、
      // 数据读取的长度length、是否发送块校验sendChecksum、缓存策略CachingStrategy类型的cachingStrategy
      readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
        PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
        proto.getHeader().getClientName(),
        proto.getOffset(),
        proto.getLen(),
        proto.getSendChecksums(),
        (proto.hasCachingStrategy() 
            getCachingStrategy(proto.getCachingStrategy()) :
          CachingStrategy.newDefaultStrategy()));
    } finally {
    	
      // 关闭traceScope
      if (traceScope != null) traceScope.close();
    }
  }
整个处理流程非常简单。首先,解析输入流,得到读数据块消息协议OpReadBlockProto,即proto,并创建TraceScope类型的traceScope;然后从读数据块消息协议proto中解析出读数据块的各种参数,比如需要读取的数据块block、访问令牌blockToken、客户端名称clientName、数据块读取的起始偏移量blockOffset、数据块读取的长度length、是否发送块校验sendChecksum、缓存策略CachingStrategy类型的cachingStrategy等,利用这些参数调用子类DataXceiver线程的readBlock()方法,进行读数据块的处理,最终关闭traceScope,整个数据块读取过程完毕。
我们再来看下其中涉及的部分细节。首先,在我们要概括性的讲解读数据块消息协议OpReadBlockProto前,我们先看下对于输入流是怎么处理的,答案就在类PBHelper中的vintPrefixed()方法中,其代码如下:
  public static InputStream vintPrefixed(final InputStream input)
      throws IOException {
	  
	// 从输入流input中读入第一个字节Byte
    final int firstByte = input.read();
    if (firstByte == -1) {
      throw new EOFException("Premature EOF: no length prefix available");
    }

    // CodedInputStream用来读取和解码协议消息字段。
    // Varint是一种数值压缩存储方法
    // readRawVarint32()方法从输入流中读取一个原始的Varint,并且,如果高于32位,丢弃之。
    // firstByte是为了告诉CodedInputStream已经从输入流input中读取了1个字节
    // 返回结果为int类型的消息大小
    int size = CodedInputStream.readRawVarint32(firstByte, input);
    
    // 确保消息大小必须大于0
    assert size >= 0;
    
    // 将输入流input包装成ExactSizeInputStream,从该输入流中只能读取size大小的数据
    // ExactSizeInputStream是一种从其他输入流中读取固定大小数据的输入流。
    return new ExactSizeInputStream(input, size);
  }
首先呢,从输入流input中读入第一个字节Byte,然后调用CodedInputStream的readRawVarint32()方法,获取请求内容的大小。CodedInputStream用来读取和解码协议消息字段。Varint是一种数值压缩存储方法。readRawVarint32()方法从输入流中读取一个原始的Varint,并且,如果高于32位,丢弃之。firstByte是为了告诉CodedInputStream已经从输入流input中读取了1个字节,返回结果为int类型的消息大小,同时确保消息大小必须大于0。最后,将输入流input包装成ExactSizeInputStream,从该输入流中只能读取size大小的数据,ExactSizeInputStream是一种从其他输入流中读取固定大小数据的输入流。

接下来,我们再说下解析输入流,得到读数据块消息协议OpReadBlockProto。这个OpReadBlockProto是什么呢?它是谷歌开源的Protobuf在HDFS中定义的进行数据传输时的一种消息协议,其消息格式的定义在文件datatransfer.proto中,内容如下:

message OpReadBlockProto {
  required ClientOperationHeaderProto header = 1;
  required uint64 offset = 2;
  required uint64 len = 3;
  optional bool sendChecksums = 4 [default = true];
  optional CachingStrategyProto cachingStrategy = 5;
}
其中,header、offset、len为必须的,因为它们使用了关键字required,而剩余两个sendChecksums、cachingStrategy则由于使用了关键字optional,所以为可选的。并且,header为ClientOperationHeaderProto类型,而ClientOperationHeaderProto也是一种消息格式,定义如下:

message ClientOperationHeaderProto {
  required BaseHeaderProto baseHeader = 1;
  required string clientName = 2;
}
其中,baseHeader还是Protobuf定义的一种消息格式,其名称为BaseHeaderProto,其定义如下:

message BaseHeaderProto {
  required ExtendedBlockProto block = 1;
  optional hadoop.common.TokenProto token = 2;
  optional DataTransferTraceInfoProto traceInfo = 3;
}
它包含了数据块block,即ExtendedBlockProto,所以,在获得读数据块消息协议OpReadBlockProto之后,调用readBlock()方法之前,我们可以使用如下语句:

PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()
来获得readBlock()方法需要使用的参数ExtendedBlock。读数据块消息协议中的其他字段不再多一一介绍,读者可自行分析。

最后,我们来看下读取数据块的readBlock()方法,其代码如下:

  @Override
  public void readBlock(final ExtendedBlock block,
      final Token<BlockTokenIdentifier> blockToken,
      final String clientName,
      final long blockOffset,
      final long length,
      final boolean sendChecksum,
      final CachingStrategy cachingStrategy) throws IOException {
    
	// 将请求中的客户端名称clientName赋值给previousOpClientName
	previousOpClientName = clientName;

	// 获取输出流baseStream,即socketOut
    OutputStream baseStream = getOutputStream();
    
    // 将输出流baseStream依次包装成BufferedOutputStream、DataOutputStream,
    // 其缓冲区大小取参数io.file.buffer.size的一半,
    // 参数未配置的话默认为512,且最大也不能超过512
    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
        baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
    
    // 访问权限检查
    checkAccess(out, true, block, blockToken,
        Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
  
    // send the block
    // 发送数据块
    
    BlockSender blockSender = null;
    
    // 获取数据节点注册信息DatanodeRegistration
    DatanodeRegistration dnR = 
      datanode.getDNRegistrationForBP(block.getBlockPoolId());
    final String clientTraceFmt =
      clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
         String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
            "%d", "HDFS_READ", clientName, "%d",
            dnR.getDatanodeUuid(), block, "%d")
        : dnR + " Served block " + block + " to " +
            remoteAddress;

    // 更新当前线程名称:Sending block...
    updateCurrentThreadName("Sending block " + block);
    try {
      try {
    	  
    	// 构造数据块发送器BlockSender对象blockSender
    	// 构造时,需要对应数据块block、数据在块中的起始位置blockOffset、读取数据的长度length等信息
        blockSender = new BlockSender(block, blockOffset, length,
            true, false, sendChecksum, datanode, clientTraceFmt,
            cachingStrategy);
      } catch(IOException e) {
        String msg = "opReadBlock " + block + " received exception " + e; 
        LOG.info(msg);
        sendResponse(ERROR, msg);
        throw e;
      }
      
      // send op status
      // 发送操纵状态
      writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));

      // 调用数据块发送器blockSender的sendBlock()方法,发送数据块
      long read = blockSender.sendBlock(out, baseStream, null); // send data

      if (blockSender.didSendEntireByteRange()) {
        // If we sent the entire range, then we should expect the client
        // to respond with a Status enum.
        try {
          ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
              PBHelper.vintPrefixed(in));
          if (!stat.hasStatus()) {
            LOG.warn("Client " + peer.getRemoteAddressString() +
                " did not send a valid status code after reading. " +
                "Will close connection.");
            IOUtils.closeStream(out);
          }
        } catch (IOException ioe) {
          LOG.debug("Error reading client status response. Will close connection.", ioe);
          IOUtils.closeStream(out);
        }
      } else {
        IOUtils.closeStream(out);
      }
      
      // 数据节点datanode记录相关系统性能指标的增长,这里是读取的字节数、读取的块数
      datanode.metrics.incrBytesRead((int) read);
      datanode.metrics.incrBlocksRead();
      
    } catch ( SocketException ignored ) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " +
            remoteAddress, ignored);
      }
      // Its ok for remote side to close the connection anytime.
      datanode.metrics.incrBlocksRead();
      IOUtils.closeStream(out);
    } catch ( IOException ioe ) {
      /* What exactly should we do here
       * Earlier version shutdown() datanode if there is disk error.
       */
      LOG.warn(dnR + ":Got exception while serving " + block + " to "
          + remoteAddress, ioe);
      throw ioe;
    } finally {
    	
      // 关闭数据块发送器
      IOUtils.closeStream(blockSender);
    }

    //update metrics
    datanode.metrics.addReadBlockOp(elapsed());
    datanode.metrics.incrReadsFromClient(peer.isLocal());
  }
readBlock()方法大体处理流程如下:

1、将请求中的客户端名称clientName赋值给previousOpClientName;

2、获取输出流baseStream,即socketOut;

3、将输出流baseStream依次包装成BufferedOutputStream、DataOutputStream,其缓冲区大小取参数io.file.buffer.size的一半,参数未配置的话默认为512,且最大也不能超过512;

4、调用checkAccess()方法进行访问权限检查;

5、发送数据块:

5.1、获取数据节点注册信息DatanodeRegistration;

5.2、更新当前线程名称:Sending block...;

5.3、构造数据块发送器BlockSender对象blockSender,构造时,需要对应数据块block、数据在块中的起始位置blockOffset、读取数据的长度length等信息;

5.4、调用writeSuccessWithChecksumInfo()方法发送操作状态;

5.5、调用数据块发送器blockSender的sendBlock()方法,发送数据块;

5.6、数据节点datanode记录相关系统性能指标的增长,这里是读取的字节数、读取的块数;

5.7、关闭数据块发送器。

大体处理流程就是这个样子。而关于BlockSender及其构造、如何定位数据以及如何发送数据等,我们将会在专门的文章中进行分析,敬请期待!






】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用HDFS fsck api在页面上快速查.. 下一篇java   正则匹配 HDFS路径..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目