设为首页 加入收藏

TOP

拉取hadoop集群上的hdfs文件
2018-12-24 12:18:50 】 浏览:58
Tags:拉取 hadoop 集群 hdfs 文件

从hadoop集群拉取hdfs文件是一个常见的需求,基于org.apache.hadoop即可做到。

但是hadoop包有个明显的缺点是引用太多,经常需要排包,包括但不限于httpclient,servlet,slf4j,tomcat等等


@Service
public class HdfsClient{

    private static final Logger logger = LoggerFactory.getLogger(HdfsClient.class);
    private FileSystem fileSystem;
    private Configuration conf;

    public synchronized void init() throws Exception {

        String proxy = "x.x.x.x:x";
        String username = "xxx";
        boolean useProxy = false;

        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://argo");
        conf.set("dfs.web.ugi", "hdfs,hadoop");
        conf.set("dfs.nameservices", "argo");
        conf.set("dfs.ha.namenodes.argo", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.argo.nn1", "xxx:x");
        conf.set("dfs.namenode.rpc-address.argo.nn2", "xxx:x");
        conf.set("dfs.client.failover.proxy.provider.argo", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        System.setProperty("HADOOP_USER_NAME", username);
        if (useProxy) {
            conf.set("hadoop.socks.server", proxy);
            conf.set("hadoop.rpc.socket.factory.class.default", "org.apache.hadoop.net.SocksSocketFactory");
            conf.set("dfs.client.use.legacy.blockreader", "true");
        }

        this.fileSystem = FileSystem.get(conf);
        logger.info("init hdfs client success,proxy=" + proxy + ",username=" + username + ",useProxy=" + useProxy);

    }

    /**
     * 拉取无分区数据
     * @param remotePath
     * @param localPath
     */
    public void pullHdfs(String remotePath, String localPath) throws Exception{

        if (!remotePath.endsWith(File.separator)) {
            remotePath = remotePath + File.separator;
        }
        if (!localPath.endsWith(File.separator)) {
            localPath = localPath + File.separator;
        }


        StopWatch time = new StopWatch();
        time.start();

        File file = new File(localPath);
        deleteFile(file);
        pullData(remotePath,localPath);


        logger.info("pull {} to {} success! size={} time={}", remotePath, localPath, getDirSize(file), time.getTime());
        time.stop();


    }


   
    /**
     * 计算文件夹大小
     * @param file
     * @return
     */
    private long getDirSize(final File file) {
        if (file.isFile())
            return file.length();
        final File[] children = file.listFiles();
        long total = 0;
        if (children != null)
            for (final File child : children)
                total += getDirSize(child);
        return total;
    }

    /**
     * 删除老数据整个文件路径
     *
     * @param file
     */
    private void deleteFile(File file) {
        if (file.exists()) {
            if (file.isFile()) {
                file.delete();
            } else if (file.isDirectory()) {
                File files[] = file.listFiles();
                for (int i = 0; i < files.length; i++) {
                    this.deleteFile(files[i]);
                }
            }
            file.delete();
        }
    }


    /**
     * 拉取远程数据到本地
     * @param remotePath
     * @param localPath
     * @throws Exception
     */
    private void pullData(String remotePath, String localPath) throws Exception {

        int tryNum = 1;

        if (StringUtils.isBlank(remotePath) || StringUtils.isBlank(localPath)) {
            logger.error("Invalid Path!");
            throw new Exception("Invalid Path!");
        }

        do {
            try {
                if (hdfsExist(remotePath)) {

                    hdfsPull(remotePath, localPath);
                    File file = new File(localPath);
                    deleteCrcChecksum(file);

                }
            }
                catch (Exception e) {
                    logger.error("error@checkData,remotePath=" + remotePath + ",localPath=" + localPath
                            + ",tryNum=" + tryNum + ",ex={}", e);
                    tryNum++;
                }
            } while (tryNum >1 && tryNum <4);

        if(tryNum == 4) {
            throw new Exception("fail to get " + remotePath + " after 3 times try");
        }

    }


    /**
     * 删除crc和success文件
     * @param file
     */
    private void deleteCrcChecksum(File file) {
        if (file.exists()) {
            if (file.isFile()) {
                if(file.getName().toLowerCase().endsWith(".crc")  || file.getName().toLowerCase().endsWith("_success")) {
                    file.delete();
                }
            } else if (file.isDirectory()) {
                File files[] = file.listFiles();
                for (int i = 0; i < files.length; i++) {
                    this.deleteCrcChecksum(files[i]);
                }
            }
        }
    }

    /**
     * 判断远程文件是否存在
     * @param dfsPath
     * @return
     * @throws IOException
     */
    private boolean hdfsExist(final String dfsPath) throws IOException {
        return fileSystem.exists(new Path(dfsPath));
    }

    /**
     * 拉取远程文件
     * @param dfsPath
     * @param localPath
     * @throws IOException
     */
    private void hdfsPull(final String dfsPath, final String localPath) throws IOException {
        try {
            fileSystem.copyToLocalFile(new Path(dfsPath), new Path(localPath));
        } catch (Exception e) {
            logger.error("Exception@HdfsClient, dfsPath=" + dfsPath + ", localPath="
                    + localPath, e);
        }
    }

}




】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HDFS启用后没有NameNode节点 下一篇Hadoop用户权限管理及hdfs权限管理

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目