设为首页 加入收藏

TOP

hadoop,hive中的mv(rename)操作
2018-12-27 12:39:26 】 浏览:301
Tags:hadoop hive rename 操作
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhoudetiankong/article/details/54911707

系统环境:hadoop2.7.2+hive1.2.1

大约一年多之前,将hive版本从0.14升级到了1.2.1。之后发现新版本在最后一步写入数据的时候,会比以前慢很多。最后发现是由于hive新版本中,默认中间结果文件是在表空间下生成以 .hive-staging_hive_ 开头的文件。以前版本默认是在/tmp/hive下。最后查出来解决办法:

修改配置文件参数:

<property>
<name>hive.exec.stagingdir</name>
<value>/tmp/hive/.hive-staging</value>
</property>

产生问题的原因是,在hive1.2.1版本中,如果中间结果文件目录跟目标目录在同一根目录的话,就会将中间结果数据复制到目标目录。而以前是直接将原目录(/tmp/hive)下的目录直接rename到目标目录。因此,变慢的原因是多了一个额外的数据复制工作。具体看代码:

MoveTask.java

private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir)
      throws Exception {
    FileSystem fs = sourcePath.getFileSystem(conf);
    if (isDfsDir) {
      // Just do a rename on the URIs, they belong to the same FS
      String mesg = "Moving data to: " + targetPath.toString();
      String mesg_detail = " from " + sourcePath.toString();
      console.printInfo(mesg, mesg_detail);

      // if source exists, rename. Otherwise, create a empty directory
      if (fs.exists(sourcePath)) {
        Path deletePath = null;
        // If it multiple level of folder are there fs.rename is failing so first
        // create the targetpath.getParent() if it not exist
        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
          deletePath = createTargetPath(targetPath, fs);
        }
        if (!Hive.moveFile(conf, sourcePath, targetPath, fs, true, false)) {
          try {
            if (deletePath != null) {
              fs.delete(deletePath, true);
            }
          } catch (IOException e) {
            LOG.info("Unable to delete the path created for facilitating rename"
                + deletePath);
          }
          throw new HiveException("Unable to rename: " + sourcePath
              + " to: " + targetPath);
        }
      } else if (!fs.mkdirs(targetPath)) {
        throw new HiveException("Unable to make directory: " + targetPath);
      }
    } else {
      // This is a local file
      String mesg = "Copying data to local directory " + targetPath.toString();
      String mesg_detail = " from " + sourcePath.toString();
      console.printInfo(mesg, mesg_detail);

      // delete the existing dest directory
      LocalFileSystem dstFs = FileSystem.getLocal(conf);

      if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
        console.printInfo(mesg, mesg_detail);
        // if source exists, rename. Otherwise, create a empty directory
        if (fs.exists(sourcePath)) {
          fs.copyToLocalFile(sourcePath, targetPath);
        } else {
          if (!dstFs.mkdirs(targetPath)) {
            throw new HiveException("Unable to make local directory: "
                + targetPath);
          }
        }
      } else {
        throw new AccessControlException(
            "Unable to delete the existing destination directory: "
            + targetPath);
      }
    }
  }

从moveTask中跟踪到,实际上是Hive.java中的moveFile方法

 public static boolean moveFile(HiveConf conf, Path srcf, Path destf,
      FileSystem fs, boolean replace, boolean isSrcLocal) throws HiveException {
    boolean success = false;

    //needed for perm inheritance.
    boolean inheritPerms = HiveConf.getBoolVar(conf,
        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
    HadoopShims shims = ShimLoader.getHadoopShims();
    HadoopShims.HdfsFileStatus destStatus = null;
    HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();

    // If source path is a subdirectory of the destination path:
    //   ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
    //   where the staging directory is a subdirectory of the destination directory
    // (1) Do not delete the dest dir before doing the move operation.
    // (2) It is assumed that subdir and dir are in same encryption zone.
    // (3) Move individual files from scr dir to dest dir.
    boolean destIsSubDir = isSubDir(srcf, destf, fs, isSrcLocal);
    try {
      if (inheritPerms || replace) {
        try{
          destStatus = shims.getFullFileStatus(conf, fs, destf.getParent());
          //if destf is an existing directory:
          //if replace is true, delete followed by rename(mv) is equivalent to replace
          //if replace is false, rename (mv) actually move the src under dest dir
          //if destf is an existing file, rename is actually a replace, and do not need
          // to delete the file first
          if (replace && !destIsSubDir) {
            LOG.debug("The path " + destf.toString() + " is deleted");
            fs.delete(destf, true);
          }
        } catch (FileNotFoundException ignore) {
          //if dest dir does not exist, any re
          if (inheritPerms) {
            destStatus = shims.getFullFileStatus(conf, fs, destf.getParent());
          }
        }
      }
      if (!isSrcLocal) {
        // For NOT local src file, rename the file
        if (hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
            && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf))
        {
          LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
          success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
              true,    // delete source
              replace, // overwrite destination
              conf);
        } else {
          if (destIsSubDir) {
            FileStatus[] srcs = fs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
            if (srcs.length == 0) {
              success = true; // Nothing to move.
            }
            for (FileStatus status : srcs) {
              success = FileUtils.copy(srcf.getFileSystem(conf), status.getPath(), destf.getFileSystem(conf), destf,
                  true,     // delete source
                  replace,  // overwrite destination
                  conf);

              if (!success) {
                throw new HiveException("Unable to move source " + status.getPath() + " to destination " + destf);
              }
            }
          } else {
            success = fs.rename(srcf, destf);
          }
        }
      } else {
        // For local src file, copy to hdfs
        fs.copyFromLocalFile(srcf, destf);
        success = true;
      }

      LOG.info((replace  "Replacing src:" : "Renaming src: ") + srcf.toString()
          + ", dest: " + destf.toString()  + ", Status:" + success);
    } catch (IOException ioe) {
      throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe);
    }

    if (success && inheritPerms) {
      try {
        ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, fs, destf);
      } catch (IOException e) {
        LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
      }
    }
    return success;
  }
从代码中可以看出,有两种策略:如果源目录和目标目录是同一个根目录,则会源目录下的每个文件执行复制操作。反之,执行remane操作(只涉及元数据,不会有额外数据操作)。

为什么会这样呢?我做了一个关于rename的验证。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class TestHdfs {
       public static void main(String args[]) throws IOException{
            Configuration conf = new Configuration();	 
            FileSystem fs = FileSystem.get(conf);
            Path srct=new Path("/tmp/lgh/d1/.dd1");
            Path dst=new Path("/usr");
            fs.rename(srct, dst);
            
       }
}
验证说明,.dd1 目录下有数据文件,执行程序之后,数据文件都移动到了/usr/目录下。对于以 . 开头的目录下的文件,rename方法,如果源目录和目标目录没有相同的前缀的话,会将数据文件移动到目标目录中。如果不是的话,则会将目标目录也移动过去。

而hive的中间结果文件命名规范,就是以.hive-staging开头的。因此,会出现上述情况。大家以可以用hadoop fs -mv命令验证。


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop源码解析之RPC分析 下一篇HDFS一     HDFS的sh..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目