设为首页 加入收藏

TOP

HDFS读取文件过程
2018-12-06 00:16:19 】 浏览:12
Tags:HDFS 读取 文件 过程
版权声明:本文为博主原创文章,转载请注明出处,Thanks~ https://blog.csdn.net/cnweike/article/details/7102766

从HDFS中读取一个文件,都需要做些什么呢?我们拿一个简单的例子来看一下:

import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class FileSystemCat {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		String uri = args[0];
  /**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  public LocatedBlocks getBlockLocations(String src, long offset, long length,
      boolean doAccessTime, boolean needBlockToken) throws IOException {
    if (isPermissionEnabled) {
      checkPathAccess(src, FsAction.READ);
    }

    if (offset < 0) {
      throw new IOException("Negative offset is not supported. File: " + src );
    }
    if (length < 0) {
      throw new IOException("Negative length is not supported. File: " + src );
    }
    final LocatedBlocks ret = getBlockLocationsInternal(src, 
        offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken);  
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "open", src, null, null);
    }
    return ret;
  }

Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);InputStream in = null;try{in = fs.open(new Path(uri));IOUtils.copyBytes(in, System.out, 4096, false);}finally{IOUtils.closeStream(in);}}}


这个程序就是一个HDFS客户端程序,我们通过这个程序可以指定一个文件的路径,然后读取这个文件的内容。这个过程中都做了哪些事情呢?从总体上看非常简单:打开文件-读取文件-关闭文件。

众所周知,HDFS是以块的形式存储文件的,在我们读取HDFS中的一个文件的时候,我们必须搞清楚这个文件由哪些块组成,这个操作就涉及到对名字节点的访问,因为名字节点存储了文件-块序列的映射信息,并将这些信息持久存储于名字节点上,当然,为了效率方面的考虑,在HDFS启动时会把这些信息加载到内存中,这也从一定程度上反映了名字节点的内存大小限制了真个HDFS集群能存储的文件量的事实。


其实,在这个小的客户端程序中,最重要的就是一行代码:

in = fs.open(new Path(uri));

open方法返回了一个InputStream,对于这个InputStream我向大家都比较熟悉了,是Java内置的数据类型,就是一个输入流。但是怎样来形成这个流就需要费一番周折了,那我们就看一下DistributedFileSystem是怎么实现的,下面我们就一步步追踪代码:

  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

继而追踪到:

  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    statistics.incrementReadOps(1);
    return new DFSClient.DFSDataInputStream(
          dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
  }

很明显,dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)返回了一个输入流,而外层的包装又装饰了一层,添加了一些其他功能。我们这里只看主要的,这个dfs.open到底做了什么。首先要弄清楚dfs是什么类型的,好吧,我们来看一下它的声明:

DFSClient dfs;

看到它是一个DFSClient对象。我们继续追溯到DFSClient这个类里open方法的实现:

  DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                      FileSystem.Statistics stats
      ) throws IOException {
    checkOpen();
    //    Get block info from namenode
    return new DFSInputStream(src, buffersize, verifyChecksum);
  }

我们看到了一个非常关键的注释:Get block info from namenode(从名称节点获取块信息)。那么我们就来看一下hdfs是怎样获取到这个文件的块信息的:

  private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) throws IOException {
    try {
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                    FileNotFoundException.class);
    }
  }

这是DFSClient中获取块信息的方法,它的第一个参数是一个实现了客户端协议的对象,第二个就是指定的文件路径,然后就是文件偏移量和长度了。里面还是只有一行关键代码:return namenode.getBlockLocations(src, start, length),这个NameNode的一个方法,我们定位到这个方法:

  public LocatedBlocks   getBlockLocations(String src, 
                                          long offset, 
                                          long length) throws IOException {
    myMetrics.incrNumGetBlockLocations();
    return namesystem.getBlockLocations(getClientMachine(), 
                                        src, offset, length);
  }

其中的namesystem是一个FSNamesystem类的实例,定位到上面调用那个方法:

  /**
   * Get block locations within the specified range.
   * 
   * @see #getBlockLocations(String, long, long)
   */
  LocatedBlocks getBlockLocations(String clientMachine, String src,
      long offset, long length) throws IOException {
    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
    if (blocks != null) {
      //sort the blocks
      DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
          clientMachine);
      for (LocatedBlock b : blocks.getLocatedBlocks()) {
        clusterMap.pseudoSortByDistance(client, b.getLocations());
      }
    }
    return blocks;
  }

这个方法的第一个参数是客户端机器,这个参数主要是在计算数据节点与客户端机器的距离时用到的,组成一个文件的所有的块都会按照配置(默认3个)冗余地存储于不同的数据节点上,为了最大程度的提高性能,客户端要直接从最近的那个数据节点上接收数据。后面的3个参数很明了,就是文件的路径、偏移量和长度。然后获取到组成这个文件的块信息、按照与客户端机器的距离排序这些块信息,最后返回。

其实到这里我们还是没有看到到底是怎样获取块信息的,只有再次追溯代码:

  /**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  public LocatedBlocks getBlockLocations(String src, long offset, long length,
      boolean doAccessTime, boolean needBlockToken) throws IOException {
    if (isPermissionEnabled) {
      checkPathAccess(src, FsAction.READ);
    }

    if (offset < 0) {
      throw new IOException("Negative offset is not supported. File: " + src );
    }
    if (length < 0) {
      throw new IOException("Negative length is not supported. File: " + src );
    }
    final LocatedBlocks ret = getBlockLocationsInternal(src, 
        offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken);  
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "open", src, null, null);
    }
    return ret;
  }

这个方法首先做了一个权限相关的检测,然后对于偏移量和文件长度参数做了一些合法性验证,然后又调用了一个静态方法来获取到相关的块信息,做了一些日志相关的记录工作,最后返回获取的块信息。继续追溯获取块信息的代码:

  private synchronized LocatedBlocks getBlockLocationsInternal(String src,
                                                       long offset, 
                                                       long length,
                                                       int nrBlocksToReturn,
                                                       boolean doAccessTime, 
                                                       boolean needBlockToken)
                                                       throws IOException {
    INodeFile inode = dir.getFileINode(src);
    if(inode == null) {
      return null;
    }
    if (doAccessTime && isAccessTimeSupported()) {
      dir.setTimes(src, inode, -1, now(), false);
    }
    Block[] blocks = inode.getBlocks();
    if (blocks == null) {
      return null;
    }
    if (blocks.length == 0) {
      return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
    }
    List<LocatedBlock> results;
    results = new ArrayList<LocatedBlock>(blocks.length);

    int curBlk = 0;
    long curPos = 0, blkSize = 0;
    int nrBlocks = (blocks[0].getNumBytes() == 0)  0 : blocks.length;
    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
      blkSize = blocks[curBlk].getNumBytes();
      assert blkSize > 0 : "Block of size 0";
      if (curPos + blkSize > offset) {
        break;
      }
      curPos += blkSize;
    }
    
    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
      return null;
    
    long endOff = offset + length;
    
    do {
      // get block locations
      int numNodes = blocksMap.numNodes(blocks[curBlk]);
      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
      int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); 
      if (numCorruptNodes != numCorruptReplicas) {
        LOG.warn("Inconsistent number of corrupt replicas for " + 
            blocks[curBlk] + "blockMap has " + numCorruptNodes + 
            " but corrupt replicas map has " + numCorruptReplicas);
      }
      boolean blockCorrupt = (numCorruptNodes == numNodes);
      int numMachineSet = blockCorrupt  numNodes : 
                            (numNodes - numCorruptNodes);
      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
      if (numMachineSet > 0) {
        numNodes = 0;
        for(Iterator<DatanodeDescriptor> it = 
            blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
          DatanodeDescriptor dn = it.next();
          boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
            machineSet[numNodes++] = dn;
        }
      }
      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
          blockCorrupt);
      if(isAccessTokenEnabled && needBlockToken) {
        b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 
            EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
      }
      
      results.add(b); 
      curPos += blocks[curBlk].getNumBytes();
      curBlk++;
    } while (curPos < endOff 
          && curBlk < blocks.length 
          && results.size() < nrBlocksToReturn);
    
    return inode.createLocatedBlocks(results);
  }

在上面这个静态方法中,获取块信息的操作更加具体了,这个方法首先引入了一个(以这个文件路径为参数创建的)索引节点(inode),如果这个索引节点对象为null,说明这个文件不存在,直接返回null。然后做一些访问标记,获取这个索引节点对象的所有块信息(Block[] blocks = inode.getBlocks())。如果获取的块信息为null直接返回null,如果块列表的长度为0,返回一个空的结果;否则先将位置移动到偏移量的位置,并同时计数块编号。如果最后看到满足条件的块编号大于块总数,直接返回null。如果以上的条件都不满足,将在范围之内的块组织到一个ArrayList中,最后用这个列表创建一个LocatedBlocks对象,并返回。




编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇检查HDFS的健康状态 下一篇centos-7 部署hadoop2.5.1 >&g..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }