设为首页 加入收藏

TOP

Java 实现HDFS API接口 与获取Active NameNode Address
2018-12-01 00:23:55 】 浏览:43
Tags:Java 实现 HDFS API 接口 获取 Active NameNode Address
版权声明:本文为博主原创文章,未经博主允许不得转载。http://mp.blog.csdn.net/configure#i https://blog.csdn.net/wangming520liwei/article/details/72772830

Java 实现HDFS API接口


接口:

import java.util.List;

/**
 * 缓存接口
 * 
 * <pre>
 * Modify Information:
 * Author       Date        Description
 * ============ =========== ============================
 * xiaoming     2017-05-08  Create this file
 * </pre>
 * 
 */
public interface HdfsFileSystem {

    /**
     * 创建一个文件,如果已存在则返回失败
     * 
     * @param dst HDFS文件目录地址
     * @param contents 要写入文件数据
     * @return 成功返回true 如果存在返回 false
     * @throws Exception
     */
    boolean createFile(String dst, String contents) throws Exception;

    /**
     * 上传本地文件
     * 
     * @param src 源文件
     * @param dst 目标文件地址
     * 
     * @return 成功返回true 如果存在返回 false
     * @throws Exception
     */
    boolean uploadFile(String src, String dst) throws Exception;

    /**
     * 重命名文件
     * 
     * @param oldName 原文件名
     * @param newName 命名后文件名
     * @return 成功返回true 失败返回false
     * @throws Exception
     */
    boolean rename(String oldName, String newName) throws Exception;

    /**
     * 新建文件夹
     * 
     * @param filePath 文件路径
     * @return 成功返回true 失败返回false
     * @throws Exception
     */
    boolean mkdir(String filePath) throws Exception;

    /**
     * 删除文件
     * 
     * @param filePath 文件路径
     * @return 成功返回true 失败返回false
     * @throws Exception
     */
    boolean delete(String path) throws Exception;

    /**
     * 下载文件
     * 
     * @param src hdfs文件路径
     * @param dst 下载到本地路径
     * @return 成功返回true 失败返回false
     * @throws Exception
     */
    boolean downloadFile(String src, String dst) throws Exception;

    /**
     * 判断文件是否存在
     * 
     * @param path 文件路径
     * @return 存在返回true 不存在返回false
     * @throws Exception
     */
    boolean isExist(String path) throws Exception;

    /**
     * 读取分布式文件系统文件,每行放入一个字符串,返回一个字符串数组
     * 
     * @param path 文件路径
     * @return 成功返回字符串数组 不成功返回null
     * @throws Exception
     */
    List<String> readFile(String path) throws Exception;

    /**
     * 从文件夹中读取文件
     * @param path
     * @return
     * @throws Exception
     */
    List<String> readFileFromDirectory(String path) throws Exception;

}








不多说,看代码:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import cpcn.payment.tool.lang.StringUtil;
import cpcn.payment.tool.middleware.log.LogType;
import cpcn.payment.tool.middleware.log.Loggerx;

/**
 * 分布式文件系统接口实现
 * 
 * <pre>
 * Modify Information:
 * Author       Date        Description
 * ============ =========== ============================
 * wangming    2017-05-10  Create this file
 * </pre>
 * 
 */
public class SmartHdfsDAO implements HdfsFileSystem {

    protected static final Loggerx LOGGER = Loggerx.getLogger("dao");

    @Override
    public boolean createFile(String dst, String contents) throws Exception {
        Configuration conf = new Configuration();

        // windows 本地测试需要配置如下, linux环境可考虑去掉
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");

        FSDataOutputStream outputStream = null;
        FileSystem fs = null;
        try {
            fs = FileSystem.get(conf);
            Path path = new Path(dst);
            outputStream = fs.create(path);
            outputStream.writeBytes(contents);

        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs createFile][dst]=[" + dst + "][contents]=[" + contents + "]");
            return false;
        } catch (Exception e) {
            throw e;
        } finally {
            outputStream.close();
            fs.close();
        }
        return true;
    }

    @Override
    public boolean uploadFile(String src, String dst) throws Exception {
        Configuration conf = new Configuration();

        // windows 本地测试需要配置如下, linux环境可考虑去掉
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");

        FileSystem fs = null;

        try {
            fs = FileSystem.get(conf);
            Path srcPath = new Path(src);
            Path dstPath = new Path(dst);
            // 调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
            fs.copyFromLocalFile(false, srcPath, dstPath);
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs uploadFile][src]=[" + src + "][dst]=[" + dst + "]");
            return false;
        } catch (Exception e) {
            throw e;
        } finally {
            fs.close();
        }

        return true;
    }

    @Override
    public boolean rename(String oldName, String newName) throws IOException {
        Configuration conf = new Configuration();

        // windows 本地测试需要配置如下, linux环境可考虑去掉
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");

        FileSystem fs = null;
        boolean result = false;

        try {
            fs = FileSystem.get(conf);
            Path oldPath = new Path(oldName);
            Path newPath = new Path(newName);
            result = fs.rename(oldPath, newPath);
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs rename][oldName]=[" + oldName + "][newName]=[" + newName + "]");
            return false;
        } catch (Exception e) {
            throw e;
        } finally {
            fs.close();
        }
        return result;
    }

    @Override
    public boolean mkdir(String filePath) throws Exception {
        Configuration conf = new Configuration();

        // 远程会出现问题 Windows测试需要如下配置
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");

        boolean result = false;
        FileSystem fs = null;
        try {
            fs = FileSystem.get(conf);
            Path srcPath = new Path(filePath);
            result = fs.mkdirs(srcPath);
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs mkdir][filePath]=[" + filePath + "]");
            return false;
        } catch (Exception e) {
            throw e;
        } finally {
            fs.close();
        }
        return result;
    }

    @Override
    public boolean delete(String path) throws Exception {
        Configuration conf = new Configuration();

        // 远程会出现问题 Windows测试需要如下配置
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");

        boolean result = false;
        FileSystem fs = null;

        try {
            fs = FileSystem.get(conf);
            Path filePath = new Path(path);
            result = fs.deleteOnExit(filePath);
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs delete][path]=[" + path + "]");
        } catch (Exception e) {
            throw e;
        } finally {
            fs.close();
        }
        return result;
    }

    @Override
    public boolean downloadFile(String src, String dst) throws Exception {

        Configuration conf = new Configuration();

        // 远程会出现问题 Windows测试需要如下配置
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");

        FileSystem fs = null;

        try {
            fs = FileSystem.get(conf);
            Path srcPath = new Path(src);
            Path dstPath = new Path(dst);
            fs.copyToLocalFile(srcPath, dstPath);
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs downloadFile][src]=[" + src + "][dst]=[" + dst + "]");
            return false;
        } catch (Exception e) {
            throw e;
        } finally {
            fs.close();
        }

        return false;
    }

    @Override
    public boolean isExist(String path) throws Exception {
        Configuration conf = new Configuration();
        FileSystem fs = null;
        boolean result = false;

        // 远程会出现问题 Windows测试需要如下配置
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
        
//      conf.set("mapred.jop.tracker", "hdfs://192.168.233.128:9000");
//      conf.set("fs.default.name", "hdfs://192.168.233.128:9000");
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");

        try {
            fs = FileSystem.get(conf);
            Path filePath = new Path(path);
            result = fs.exists(filePath);
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs isExist][path]=[" + path + "]");
            throw e;
        } catch (Exception e) {
            throw e;
        } finally {
            fs.close();
        }
        return result;
    }

    @Override
    public List<String> readFile(String path) throws Exception {
        Configuration conf = new Configuration();

        // 远程会出现问题 Windows测试需要如下配置
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");

        List<String> data = new ArrayList<String>();
        FileSystem fs = null;
        FSDataInputStream in = null;
        BufferedReader br = null;

        try {
            fs = FileSystem.get(conf);
            Path srcPath = new Path(path);
            in = fs.open(srcPath);
            br = new BufferedReader(new InputStreamReader(in));
            String line = null;
            while (null != (line = br.readLine())) {
                if (!StringUtil.isEmpty(line)) {
                    data.add(line);
                }
            }
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs readFile][path]=[" + path + "]");
            return null;
        } catch (Exception e) {
            throw e;
        } finally {
            br.close();
            fs.close();
        }

        return data;
    }
    
    /**
     * 从文件接夹中读取数据
     * @see cpcn.payment.tool.lang.hdfs.HdfsFileSystem#readFileFromDirectory(java.lang.String)
     */
    @Override
    public List<String> readFileFromDirectory(String path) throws Exception {
        Configuration conf = new Configuration();

        // 远程会出现问题 Windows测试需要如下配置
        conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
        conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");

        List<String> data = new ArrayList<String>();
        FileSystem fs = null;
        FSDataInputStream in = null;
        BufferedReader br = null;

        try {
            fs = FileSystem.get(conf);
            Path srcPath = new Path(path);
            boolean result = fs.isDirectory(srcPath);
            if(result){
                FileStatus[] status = fs.listStatus(srcPath);
                for (FileStatus file : status) {
                    in = fs.open(file.getPath());
                    br = new BufferedReader(new InputStreamReader(in));
                    String line = null;
                    while (null != (line = br.readLine())) {
                        if (!StringUtil.isEmpty(line)) {
                            data.add(line);
                        }
                    }
                }
            }
        } catch (IllegalArgumentException e) {
            LOGGER.info(LogType.INFO, "[hdfs readFile][path]=[" + path + "]");
            return null;
        } catch (Exception e) {
            throw e;
        } finally {
            br.close();
            fs.close();
        }
        return data;
    }
}




增加一个方法:获取HDFS HA Active 节点

    public String getActiveNameNode() throws IOException {
        InetSocketAddress active = HAUtil.getAddressOfActive(fileSystem);
        InetAddress address = active.getAddress();
        return "hdfs://"+address.getHostAddress()+":"+active.getPort();
    } 





】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HDFS数据资源使用量分析以及趋势.. 下一篇flume上传日志到ha hadoop hdfs上

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目