版权声明:本文为博主原创文章,未经博主允许不得转载。http://mp.blog.csdn.net/configure#i https://blog.csdn.net/wangming520liwei/article/details/72772830
Java 实现HDFS API接口
接口:
import java.util.List;
/**
* 缓存接口
*
* <pre>
* Modify Information:
* Author Date Description
* ============ =========== ============================
* xiaoming 2017-05-08 Create this file
* </pre>
*
*/
public interface HdfsFileSystem {
/**
* 创建一个文件,如果已存在则返回失败
*
* @param dst HDFS文件目录地址
* @param contents 要写入文件数据
* @return 成功返回true 如果存在返回 false
* @throws Exception
*/
boolean createFile(String dst, String contents) throws Exception;
/**
* 上传本地文件
*
* @param src 源文件
* @param dst 目标文件地址
*
* @return 成功返回true 如果存在返回 false
* @throws Exception
*/
boolean uploadFile(String src, String dst) throws Exception;
/**
* 重命名文件
*
* @param oldName 原文件名
* @param newName 命名后文件名
* @return 成功返回true 失败返回false
* @throws Exception
*/
boolean rename(String oldName, String newName) throws Exception;
/**
* 新建文件夹
*
* @param filePath 文件路径
* @return 成功返回true 失败返回false
* @throws Exception
*/
boolean mkdir(String filePath) throws Exception;
/**
* 删除文件
*
* @param filePath 文件路径
* @return 成功返回true 失败返回false
* @throws Exception
*/
boolean delete(String path) throws Exception;
/**
* 下载文件
*
* @param src hdfs文件路径
* @param dst 下载到本地路径
* @return 成功返回true 失败返回false
* @throws Exception
*/
boolean downloadFile(String src, String dst) throws Exception;
/**
* 判断文件是否存在
*
* @param path 文件路径
* @return 存在返回true 不存在返回false
* @throws Exception
*/
boolean isExist(String path) throws Exception;
/**
* 读取分布式文件系统文件,每行放入一个字符串,返回一个字符串数组
*
* @param path 文件路径
* @return 成功返回字符串数组 不成功返回null
* @throws Exception
*/
List<String> readFile(String path) throws Exception;
/**
* 从文件夹中读取文件
* @param path
* @return
* @throws Exception
*/
List<String> readFileFromDirectory(String path) throws Exception;
}
不多说,看代码:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import cpcn.payment.tool.lang.StringUtil;
import cpcn.payment.tool.middleware.log.LogType;
import cpcn.payment.tool.middleware.log.Loggerx;
/**
* 分布式文件系统接口实现
*
* <pre>
* Modify Information:
* Author Date Description
* ============ =========== ============================
* wangming 2017-05-10 Create this file
* </pre>
*
*/
public class SmartHdfsDAO implements HdfsFileSystem {
protected static final Loggerx LOGGER = Loggerx.getLogger("dao");
@Override
public boolean createFile(String dst, String contents) throws Exception {
Configuration conf = new Configuration();
// windows 本地测试需要配置如下, linux环境可考虑去掉
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
FSDataOutputStream outputStream = null;
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
Path path = new Path(dst);
outputStream = fs.create(path);
outputStream.writeBytes(contents);
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs createFile][dst]=[" + dst + "][contents]=[" + contents + "]");
return false;
} catch (Exception e) {
throw e;
} finally {
outputStream.close();
fs.close();
}
return true;
}
@Override
public boolean uploadFile(String src, String dst) throws Exception {
Configuration conf = new Configuration();
// windows 本地测试需要配置如下, linux环境可考虑去掉
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
// 调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
fs.copyFromLocalFile(false, srcPath, dstPath);
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs uploadFile][src]=[" + src + "][dst]=[" + dst + "]");
return false;
} catch (Exception e) {
throw e;
} finally {
fs.close();
}
return true;
}
@Override
public boolean rename(String oldName, String newName) throws IOException {
Configuration conf = new Configuration();
// windows 本地测试需要配置如下, linux环境可考虑去掉
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
FileSystem fs = null;
boolean result = false;
try {
fs = FileSystem.get(conf);
Path oldPath = new Path(oldName);
Path newPath = new Path(newName);
result = fs.rename(oldPath, newPath);
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs rename][oldName]=[" + oldName + "][newName]=[" + newName + "]");
return false;
} catch (Exception e) {
throw e;
} finally {
fs.close();
}
return result;
}
@Override
public boolean mkdir(String filePath) throws Exception {
Configuration conf = new Configuration();
// 远程会出现问题 Windows测试需要如下配置
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
boolean result = false;
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
Path srcPath = new Path(filePath);
result = fs.mkdirs(srcPath);
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs mkdir][filePath]=[" + filePath + "]");
return false;
} catch (Exception e) {
throw e;
} finally {
fs.close();
}
return result;
}
@Override
public boolean delete(String path) throws Exception {
Configuration conf = new Configuration();
// 远程会出现问题 Windows测试需要如下配置
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
boolean result = false;
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
Path filePath = new Path(path);
result = fs.deleteOnExit(filePath);
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs delete][path]=[" + path + "]");
} catch (Exception e) {
throw e;
} finally {
fs.close();
}
return result;
}
@Override
public boolean downloadFile(String src, String dst) throws Exception {
Configuration conf = new Configuration();
// 远程会出现问题 Windows测试需要如下配置
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
fs.copyToLocalFile(srcPath, dstPath);
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs downloadFile][src]=[" + src + "][dst]=[" + dst + "]");
return false;
} catch (Exception e) {
throw e;
} finally {
fs.close();
}
return false;
}
@Override
public boolean isExist(String path) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = null;
boolean result = false;
// 远程会出现问题 Windows测试需要如下配置
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
// conf.set("mapred.jop.tracker", "hdfs://192.168.233.128:9000");
// conf.set("fs.default.name", "hdfs://192.168.233.128:9000");
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
try {
fs = FileSystem.get(conf);
Path filePath = new Path(path);
result = fs.exists(filePath);
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs isExist][path]=[" + path + "]");
throw e;
} catch (Exception e) {
throw e;
} finally {
fs.close();
}
return result;
}
@Override
public List<String> readFile(String path) throws Exception {
Configuration conf = new Configuration();
// 远程会出现问题 Windows测试需要如下配置
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
List<String> data = new ArrayList<String>();
FileSystem fs = null;
FSDataInputStream in = null;
BufferedReader br = null;
try {
fs = FileSystem.get(conf);
Path srcPath = new Path(path);
in = fs.open(srcPath);
br = new BufferedReader(new InputStreamReader(in));
String line = null;
while (null != (line = br.readLine())) {
if (!StringUtil.isEmpty(line)) {
data.add(line);
}
}
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs readFile][path]=[" + path + "]");
return null;
} catch (Exception e) {
throw e;
} finally {
br.close();
fs.close();
}
return data;
}
/**
* 从文件接夹中读取数据
* @see cpcn.payment.tool.lang.hdfs.HdfsFileSystem#readFileFromDirectory(java.lang.String)
*/
@Override
public List<String> readFileFromDirectory(String path) throws Exception {
Configuration conf = new Configuration();
// 远程会出现问题 Windows测试需要如下配置
conf.set("mapred.jop.tracker", "hdfs://192.168.121.167:8030");
conf.set("fs.default.name", "hdfs://192.168.121.167:8020");
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3");
List<String> data = new ArrayList<String>();
FileSystem fs = null;
FSDataInputStream in = null;
BufferedReader br = null;
try {
fs = FileSystem.get(conf);
Path srcPath = new Path(path);
boolean result = fs.isDirectory(srcPath);
if(result){
FileStatus[] status = fs.listStatus(srcPath);
for (FileStatus file : status) {
in = fs.open(file.getPath());
br = new BufferedReader(new InputStreamReader(in));
String line = null;
while (null != (line = br.readLine())) {
if (!StringUtil.isEmpty(line)) {
data.add(line);
}
}
}
}
} catch (IllegalArgumentException e) {
LOGGER.info(LogType.INFO, "[hdfs readFile][path]=[" + path + "]");
return null;
} catch (Exception e) {
throw e;
} finally {
br.close();
fs.close();
}
return data;
}
}
增加一个方法:获取HDFS HA Active 节点
public String getActiveNameNode() throws IOException {
InetSocketAddress active = HAUtil.getAddressOfActive(fileSystem);
InetAddress address = active.getAddress();
return "hdfs://"+address.getHostAddress()+":"+active.getPort();
}