设为首页 加入收藏

TOP

HDFS初级编程实践
2018-11-21 14:55:03 】 浏览:51
Tags:HDFS 初级 编程 实践

目录

0.码仙励志

1.开发环境要求

2.创建项目

3.编写jar包的依赖信息

4.远程操作HDFS的代码实现

(1)import一些需要用到的类

(2)远程连接HDFS系统的代码实现

(3)创建hdfs文件夹的代码实现

(4)实现往HDFS上传文件的代码

(5)从HDFS下载文件到本地代码实现

(6)在运行中读取HDFS文件的代码实现

(7)HDFS文件重命名代码实现

(8)在程序中创建文件的代码实现

(9)追加内容到某个文件的代码实现

(10)删除HDFS文件的代码实现

(11)整篇代码


0.码仙励志

你害怕一件事,可还是要去做,那才是勇敢

1.开发环境要求

  1. java jdk 1.8
  2. eclipse 2018
  3. maven 2.7.5
  4. 远程HDFS服务支持

2.创建项目

首先打开eclipse,创建maven项目

创建完毕后检查下jdk版本是否正确,在鼠标移动到项目上点击鼠标右键->Build Path-> Configure Build Path,如下图操作

3.编写jar包的依赖信息

我们开始进入编码工作,由于我们创建的是maven工程,所以要编写pom.xml文件,下面是我编写好的pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.maxian</groupId>
	<artifactId>hdfs</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>hdfs</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<hadoop.version>2.7.5</hadoop.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-common</artifactId>
			<version>${hadoop.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
			<version>${hadoop.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>${hadoop.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-yarn-common</artifactId>
			<version>${hadoop.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>${hadoop.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>${hadoop.version}</version>
		</dependency>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

编写完毕后保存,在工程根目录点鼠标右键->Maven ->Update Project。

4.远程操作HDFS的代码实现

首先先在com.maxian.hdfs包下创建一个叫TestHdfs的类

(1)import一些需要用到的类

现在我们可以开始编写JAVA代码了,打开com.maxian.hdfs下的TestHdfs.java类,import一些需要用到的类:

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;

(2)远程连接HDFS系统的代码实现

之后我们实例化Configuration和定义FileSystem 这两个静态常量,FileSystem是一个实现了文件系统的抽象类,继承自org.apache.hadoop.conf.Configured,并实现了 Closeable接口,可以适用于多种文件系统,如本地文件系统 file://、ftp、hdfs等。 代码如下:

static Configuration conf = new Configuration();
static FileSystem hdfs;

接着编写远程连接HDFS系统的一些配置信息,代码如下:

static {
	UserGroupInformation ugi = UserGroupInformation.createRemoteUser("root");
	try {
		ugi.doAs(new PrivilegedExceptionAction<Void>() {
			public Void run() throws Exception {
				Configuration conf = new Configuration();
				conf.set("fs.defaultFS", "hdfs://192.168.56.110:9000/");
				// 以下两行是支持 hdfs的追加功能的:hdfs.append()
				conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
				conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
				Path path = new Path("hdfs://192.168.56.110:9000/");
				hdfs = FileSystem.get(path.toUri(), conf);
				// hdfs = path.getFileSystem(conf); // 这个也可以
				return null;
			}
		});
	} catch (IOException e) {
		e.printStackTrace();
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

(3)创建hdfs文件夹的代码实现

// 创建文件夹方法
public static void createDir(String dir) throws IOException {
	// String dir = "/test3/";
	Path path = new Path(dir);
	// 判断文件夹是否已存在,如果已存在就不再创建。
	if (hdfs.exists(path)) {
		System.out.println("文件夹 \t" + dir + "\t 已存在");
		return;
	}
	// 开始创建文件夹
	hdfs.mkdirs(path);
	System.out.println("新建文件夹 \t" + dir);
}

代码编写完毕后,我们在入口方法main调用createDir方法进行验证,代码如下:

public static void main( String[] args ) throws IOException
{
    createDir("/test1");
}

现在我们可以开始来验证代码是否可行,首先确认集群的hadoop的dfs服务是否已经启动,确认dfs已经正常启动后,我们在该类的编辑界面上右击鼠标,在弹出的菜单中选中Run As -> Java Application开始运行该类。

执行完后我们会在Console中看到相应的执行结果。也可以在集群的master主机中验证是否执行成功,在master中输入命令“hadoop fs -ls /”查看hdfs根目录的文件,如果得到如下图结果证明文件夹创建成功。

(4)实现往HDFS上传文件的代码

// 复制本地文件到HDFS
public static void copyFile(String localSrc, String hdfsDst, String fileName) throws IOException {
	if ("".equals(localSrc)) {
		localSrc = "D:/wordcount/input/myfile.txt";
	}
	if ("".equals(hdfsDst)) {
		hdfsDst = "/test/";
	}
	Path src = new Path(localSrc);
	Path dst = new Path(hdfsDst);
	// 本地文件不存在
	if (!(new File(localSrc)).exists()) {
		System.out.println("Error: 本地文件 \t" + localSrc + "\t 不存在。");
		return;
	}
	// hdfs路径不存在
	if (!hdfs.exists(dst)) {
		System.out.println("Error: hdfs目录 \t" + dst.toUri() + "\t 不存在。");
		return;
	}
	if ("".equals(fileName)) {
		fileName = src.getName();
	}
	String dstPath = dst.toUri() + "/" + fileName;
	System.out.println(dstPath); // "/test2/myfile.txt"
	Path targetPath = new Path(dstPath);
	// 判断上传的文件 hdfs的目录下是否存在
	if (hdfs.exists(targetPath)) {
		System.out.println("Warn: 文件 \t" + dstPath + "\t 已存在。");
	} else {
		// 本地文件上传hdfs
		hdfs.copyFromLocalFile(src, targetPath);
		// 遍历该目录文件
		FileStatus files[] = hdfs.listStatus(dst);
		System.out.println("上传到 \t" + conf.get("fs.defaultFS") + hdfsDst);
		for (FileStatus file : files) {
			System.out.println(file.getPath());
		}
	}
}

代码编写完成后,我们也来运行验证下,在main中注释掉之前的createDir方法,将copyFile方法写进去,代码如下:

public static void main(String[] args) throws IOException {
	//createDir("/test1");
	copyFile("D:/wordcount/input/myfile.txt","/test1/","myfile.txt");
}

编写完毕后一样在该类的编辑界面上右击鼠标,在弹出的菜单中选中 Run As -> Java Application开始运行该类。执行完后我们会在Console中看到相应的执行结果。也可以在集群的master主机中验证是否执行成功,在master中输入命令“hadoop fs -ls /test1/”查看test1目录的文件,如果得到如下图结果证明文件上传成功。

(5)从HDFS下载文件到本地代码实现

// 下载文件方法
public static void downloadFile() throws IllegalArgumentException, IOException {
	String hdfsDst = "/test1/myfile.txt";
	String localSrc = "D:/test";
	Path dst = new Path(hdfsDst);
	Path src = new Path(localSrc);
	String localFile = localSrc + "/" + dst.getName(); // 本地的路径 + hdfs下载的文件名
	if (!hdfs.exists(dst.getParent())) { // 如果HDFS路径不存在
		System.out.println("Error : HDFS路径:\t" + dst.getParent() + "\t 不存在!");
		return;
	}
	if (!new File(localSrc).exists()) { // 如果本地目录不存在,则创建
		new File(localSrc).mkdirs();
		System.out.println("Warn : 本地目录已创建!");
	}
	if (new File(localFile).exists()) { // 如果本地文件存在
		System.out.println("Error : 本地文件: \t" + localFile + "\t 已存在.");
		return;
	}
	if (!hdfs.exists(new Path(hdfsDst))) { // 如果HDFS文件不存在
		System.out.println("Error : HDFS文件: \t" + hdfsDst + "\t 不存在.");
	} else {
		// HDFS下载文件到本地
		hdfs.copyToLocalFile(false, dst, src, true);
		System.out.println("successful :下载成功! 请查看: \t" + localSrc);
	}
}

代码编写完成后,我们也来运行验证下,在main中注释掉之前的copyFile方法,将downloadFile方法写进去,代码如下:

public static void main(String[] args) throws IOException {
	downloadFile();
}

(6)在运行中读取HDFS文件的代码实现

// 读取文件方法
public static void readFile() throws IOException {
	String uri = "/test1/myfile.txt";
	// 判断文件是否存在
	if (!hdfs.exists(new Path(uri))) {
		System.out.println("Error ; 文件不存在.");
		return;
	}
	InputStream in = null;
	try {
		in = hdfs.open(new Path(uri));
		// 复制到标准输出流
		IOUtils.copyBytes(in, System.out, 4096, false);
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		IOUtils.closeStream(in);
	}
}

代码编写完成后,我们也来运行验证下,在main中注释掉之前的downloadFile方法,将readFile方法写进去,代码如下:

public static void main(String[] args) throws IOException {
	readFile();
}

(7)HDFS文件重命名代码实现

// 重命名方法
public static void renameFile() throws IOException {
	String oldName = "/test1/myfile.txt";
	String newName = "/test1/readme_1.txt";
	Path oldPath = new Path(oldName);
	Path newPath = new Path(newName);
	if (hdfs.exists(oldPath)) {
		hdfs.rename(oldPath, newPath);
		System.out.println("rename成功!");
	} else {
		System.out.println("文件不存在!rename失败!");
	}
}

代码编写完成后,我们也来运行验证下,在main中注释掉之前的readFile方法,将renameFile方法写进去,代码如下:

public static void main(String[] args) throws IOException {
	renameFile();
}

编写完毕后一样在该类的编辑界面上右击鼠标,在弹出的菜单中选中Run As -> Java Application开始运行该类。执行完后我们会在Console中看到相应的执行结果。也可以在集群的master主机中验证是否执行成功,在master中输入命令“hadoop fs -ls /test1/”查看test1目录的文件,查看文件名是否已经重命名,如果得到如下图结果证明文件重命名成功。

(8)在程序中创建文件的代码实现

// 创建文件方法
public static void createFile() throws IOException {
	String fileName = "/test1/file1.txt";
	String fileContent = "this is new file.";
	Path dst = new Path(fileName);
	// 判断 新建的文件在hdfs上是否存在
	if (hdfs.exists(dst)) {
		System.out.println("Error : 文件已存在.");
	} else {
		// 将文件内容转成字节数组
		byte[] bytes = fileContent.getBytes();
		FSDataOutputStream output = hdfs.create(dst);
		output.write(bytes);
		output.close();
		System.out.println("创建文件 \t" + fileName);
	}
}

代码编写完成后,我们也来运行验证下,在main中注释掉之前的renameFile方法,将createFile方法写进去,代码如下:

public static void main(String[] args) throws IOException {
	createFile();
}

接着看一下该文件里面的内容

(9)追加内容到某个文件的代码实现

// 追加内容方法
public static void appendFile() throws IOException {
	String fileName = "/test1/file1.txt";
	String fileContent = "Here is an additional content";
	Path dst = new Path(fileName);
	byte[] bytes = fileContent.getBytes();
	// 如果文件不存在
	if (!hdfs.exists(dst)) {
		System.out.println("Error : 文件不存在。");
		return;
	}
	FSDataOutputStream output = hdfs.append(dst);
	output.write(bytes);
	output.close();
	System.out.println("successful: 追加内容到 \t" + fileName);
}

代码编写完成后,我们一样也来运行验证下,在main中注释掉之前的createFile方法,将appendFile方法写进去,代码如下:

public static void main(String[] args) throws IOException {
	appendFile();
}

(10)删除HDFS文件的代码实现

//删除HDFS文件的代码实现
public static void deleteFile(String fileName) throws IOException {
    if("".equals(fileName)) {
    fileName = "/test2/file1.txt";
    }
        Path f = new Path(fileName);
        boolean isExists = hdfs.exists(f);
        if (isExists) { // if exists, delete  
            boolean isDel = hdfs.delete(f, true);
            System.out.println(fileName + "  删除状态:  " + isDel);
        } else {
            System.out.println(fileName + "  文件不存在。");
        }
}

代码编写完成后,我们一样也来运行验证下,在main中注释掉之前的appendFile方法,将deleteFile方法写进去,代码如下:

public static void main(String[] args) throws IOException {
	deleteFile("/test1/file1.txt");
}

编写完毕后一样在该类的编辑界面上右击鼠标,在弹出的菜单中选中Run As -> Java Application开始运行该类。执行完后我们会在Console中看到相应的执行结果。也可以在集群的master主机中验证是否执行成功,在master中输入命令“hadoop fs -ls /test1/”查看test1目录的file1.txt文件是否已经被删除。

(11)整篇代码

package com.maxian.hdfs;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;

public class TestHdfs {

	static Configuration conf = new Configuration();
	static FileSystem hdfs;
	static {
		UserGroupInformation ugi = UserGroupInformation.createRemoteUser("root");
		try {
			ugi.doAs(new PrivilegedExceptionAction<Void>() {
				public Void run() throws Exception {
					Configuration conf = new Configuration();
					conf.set("fs.defaultFS", "hdfs://192.168.56.110:9000/");
					// 以下两行是支持 hdfs的追加功能的:hdfs.append()
					conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
					conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
					Path path = new Path("hdfs://192.168.56.110:9000/");
					hdfs = FileSystem.get(path.toUri(), conf);
					// hdfs = path.getFileSystem(conf); // 这个也可以
					return null;
				}
			});
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	// 创建文件夹方法
	public static void createDir(String dir) throws IOException {
		// String dir = "/test3/";
		Path path = new Path(dir);
		// 判断文件夹是否已存在,如果已存在就不再创建。
		if (hdfs.exists(path)) {
			System.out.println("文件夹 \t" + dir + "\t 已存在");
			return;
		}
		// 开始创建文件夹
		hdfs.mkdirs(path);
		System.out.println("新建文件夹 \t" + dir);
	}

	// 复制本地文件到HDFS
	public static void copyFile(String localSrc, String hdfsDst, String fileName) throws IOException {
		if ("".equals(localSrc)) {
			localSrc = "D:/wordcount/input/myfile.txt";
		}
		if ("".equals(hdfsDst)) {
			hdfsDst = "/test/";
		}
		Path src = new Path(localSrc);
		Path dst = new Path(hdfsDst);
		// 本地文件不存在
		if (!(new File(localSrc)).exists()) {
			System.out.println("Error: 本地文件 \t" + localSrc + "\t 不存在。");
			return;
		}
		// hdfs路径不存在
		if (!hdfs.exists(dst)) {
			System.out.println("Error: hdfs目录 \t" + dst.toUri() + "\t 不存在。");
			return;
		}
		if ("".equals(fileName)) {
			fileName = src.getName();
		}
		String dstPath = dst.toUri() + "/" + fileName;
		System.out.println(dstPath); // "/test2/myfile.txt"
		Path targetPath = new Path(dstPath);
		// 判断上传的文件 hdfs的目录下是否存在
		if (hdfs.exists(targetPath)) {
			System.out.println("Warn: 文件 \t" + dstPath + "\t 已存在。");
		} else {
			// 本地文件上传hdfs
			hdfs.copyFromLocalFile(src, targetPath);
			// 遍历该目录文件
			FileStatus files[] = hdfs.listStatus(dst);
			System.out.println("上传到 \t" + conf.get("fs.defaultFS") + hdfsDst);
			for (FileStatus file : files) {
				System.out.println(file.getPath());
			}
		}
	}

	// 下载文件方法
	public static void downloadFile() throws IllegalArgumentException, IOException {
		String hdfsDst = "/test1/myfile.txt";
		String localSrc = "D:/test";
		Path dst = new Path(hdfsDst);
		Path src = new Path(localSrc);
		String localFile = localSrc + "/" + dst.getName(); // 本地的路径 + hdfs下载的文件名
		if (!hdfs.exists(dst.getParent())) { // 如果HDFS路径不存在
			System.out.println("Error : HDFS路径:\t" + dst.getParent() + "\t 不存在!");
			return;
		}
		if (!new File(localSrc).exists()) { // 如果本地目录不存在,则创建
			new File(localSrc).mkdirs();
			System.out.println("Warn : 本地目录已创建!");
		}
		if (new File(localFile).exists()) { // 如果本地文件存在
			System.out.println("Error : 本地文件: \t" + localFile + "\t 已存在.");
			return;
		}
		if (!hdfs.exists(new Path(hdfsDst))) { // 如果HDFS文件不存在
			System.out.println("Error : HDFS文件: \t" + hdfsDst + "\t 不存在.");
		} else {
			// HDFS下载文件到本地
			hdfs.copyToLocalFile(false, dst, src, true);
			System.out.println("successful :下载成功! 请查看: \t" + localSrc);
		}
	}

	// 读取文件方法
	public static void readFile() throws IOException {
		String uri = "/test1/myfile.txt";
		// 判断文件是否存在
		if (!hdfs.exists(new Path(uri))) {
			System.out.println("Error ; 文件不存在.");
			return;
		}
		InputStream in = null;
		try {
			in = hdfs.open(new Path(uri));
			// 复制到标准输出流
			IOUtils.copyBytes(in, System.out, 4096, false);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(in);
		}
	}

	// 重命名方法
	public static void renameFile() throws IOException {
		String oldName = "/test1/myfile.txt";
		String newName = "/test1/readme_1.txt";
		Path oldPath = new Path(oldName);
		Path newPath = new Path(newName);
		if (hdfs.exists(oldPath)) {
			hdfs.rename(oldPath, newPath);
			System.out.println("rename成功!");
		} else {
			System.out.println("文件不存在!rename失败!");
		}
	}

	// 创建文件方法
	public static void createFile() throws IOException {
		String fileName = "/test1/file1.txt";
		String fileContent = "this is new file.";
		Path dst = new Path(fileName);
		// 判断 新建的文件在hdfs上是否存在
		if (hdfs.exists(dst)) {
			System.out.println("Error : 文件已存在.");
		} else {
			// 将文件内容转成字节数组
			byte[] bytes = fileContent.getBytes();
			FSDataOutputStream output = hdfs.create(dst);
			output.write(bytes);
			output.close();
			System.out.println("创建文件 \t" + fileName);
		}
	}

	// 追加内容方法
	public static void appendFile() throws IOException {
		String fileName = "/test1/file1.txt";
		String fileContent = "Here is an additional content";
		Path dst = new Path(fileName);
		byte[] bytes = fileContent.getBytes();
		// 如果文件不存在
		if (!hdfs.exists(dst)) {
			System.out.println("Error : 文件不存在。");
			return;
		}
		FSDataOutputStream output = hdfs.append(dst);
		output.write(bytes);
		output.close();
		System.out.println("successful: 追加内容到 \t" + fileName);
	}

	// 删除HDFS文件的代码实现
	public static void deleteFile(String fileName) throws IOException {
		if ("".equals(fileName)) {
			fileName = "/test2/file1.txt";
		}
		Path f = new Path(fileName);
		boolean isExists = hdfs.exists(f);
		if (isExists) { // if exists, delete
			boolean isDel = hdfs.delete(f, true);
			System.out.println(fileName + "  删除状态:  " + isDel);
		} else {
			System.out.println(fileName + "  文件不存在。");
		}
	}

	public static void main(String[] args) throws IOException {
//		createDir("/test1");
//		copyFile("D:/wordcount/input/myfile.txt", "/test1/", "myfile.txt");
//		downloadFile();
//		readFile();
//		renameFile();
//		createFile();
//		appendFile();
		deleteFile("/test1/file1.txt");
	}
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop   HDFS基本操作(ubu.. 下一篇HDFS修改Blocksize

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目