设为首页 加入收藏

TOP

HDFS API 读操作 -seek指针操作 ,append追加文件
2018-11-13 14:11:48 】 浏览:173
Tags:HDFS API 操作 -seek 指针 append 追加 文件

1.通过java.net.URL类访问写入HDFS数据

/*
	 * 通过java.net.URL类访问写入HDFS数据
	 * 结论:通过URL的方式不能实现对HDFS的写操作,抛java.net.UnknownServiceException: 
protocol (协议)doesn't support output
	 */
	@Test
	public void writeByURL() throws Exception {
		URL _url  =new URL("hdfs://master1:9000/spaceQuota/hello.txt");
		URLConnection conn = _url.openConnection();
		OutputStream out = conn.getOutputStream();
		out.write("hello world".getBytes()); //向集群文件 写hello world
		out.close();
	}    

2.通过FileSystem API做write操作

  /*
	 * 通过FileSystem API做写操作
	 */
	@Test
	public void  writeByAPI() throws IOException{
		Configuration conf = new Configuration();
		FileSystem fs  =FileSystem.get(conf);
		Path file = new Path("/spaceQuota/hello.txt");
		FSDataOutputStream out = fs.create(file);
		out.write("hello world".getBytes());
		out.close();
	}

3.通过FileSystem API做写操作,动态设置相关参数:replication为2和blocksize为10字节

-----------------------------------------------------------------------------------------------
	/**
	 * 通过FileSystem API做写操作,动态设置相关参数:replication和blocksize
	 * 有一些参数集群生效,例如:dfs.namenode.fs-limits.min-block-size=10
	 * 注:加载config信息的优先级:
	 *     代码级-->{classPath/***-site.xml}-->{HOADOOP_HOME/etc/haddop/***-site.xml}
	 */
	@Test
	public void writeByAPIForBlocksize() throws IOException{
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://master1:9000");
		//conf.set("dfs.bytes-per-checksum", "10");//设置校验和为10
		FileSystem fs  =FileSystem.get(conf);
		Path file = new Path("/spaceQuota/hello6666.txt");
		FSDataOutputStream out = fs.create(file, true, 4096,(short)2, 10);
  //fs.create(f, overwrite, bufferSize, replication, blockSize)
		out.write("hello world".getBytes());
		out.close();
	}
问题:出现如下错误时:
org.apache.hadoop.HadoopIllegalArgumentException:
Invalid values: dfs.bytes-per-checksum (=512) must divide block size (=10).
原因是:block大小设置为10时,相应的dfs.bytes-per-checksum 也要重新设置
解决:
 conf.set("dfs.bytes-per-checksum", "10");//设置校验和为10

4.seek操作

/*
	 * 通过FileSystem API做read操作, 设置位置信息seek()
	 * 总结:FSDataInputStream:可执行Seekable(),
	 * 而FSDataOutputStream没有,
	 */
	@Test
	public void seekByAPI() throws IOException{
		Configuration conf=new Configuration();
		FileSystem fs=FileSystem.get(conf);
		Path file=new Path("/spaceQuota/hello.txt");
		FSDataInputStream in=fs.open(file);
		IOUtils.copyBytes(in,System.out,4096,false);//注意:流不能关闭
		in.seek(0);//使输入指针回到0,相当于又读了一遍到控制台
		IOUtils.copyBytes(in,System.out,4096,true);
	}
结果:

5.测试一致模型 :描述文件读/写的数据可见性

---------------------------------------------------------------------------------------
	/**
	 * 通过读写,测试文件系统的一致模型
	 */
	@Test
	public void  writeByAPIText() throws IOException{
		Configuration conf = new Configuration();
		conf.set("dfs.bytes-per-checksum", "10");
		FileSystem fs  =FileSystem.get(conf);
		Path file = new Path("/spaceQuota/hello.txt");
		//创建文件时,文件立即可见
		FSDataOutputStream out = fs.create(file,true,4096,(short)2,10);
		out.write("hello world1a".getBytes());
		out.flush();
		out.write("hello world2a".getBytes());
		out.hflush();
		out.write("hello world3a".getBytes());
		out.hsync();
		out.close();
	}	
	@Test
	public void  readByAPIText() throws IOException{
		Configuration conf = new Configuration();
		FileSystem fs  =FileSystem.get(conf);
		Path file = new Path("/spaceQuota/hello.txt");
		FSDataInputStream in = fs.open(file);
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		byte[] b = new byte[1024];
		int bytesRead;
		while ((bytesRead = in.read(b))!=-1) {
			 out.write(b, 0, bytesRead);
		}
		System.out.println(new String(out.toByteArray()));
//		IOUtils.copyBytes(in, System.out, 4096,true);
	}

6.append追加操作

---------------------------------------------------------------------------------------------
	/*
	 * 注意:如集群节点少于3个,会抛异常;解决方案修改
【dfs.client.block.write.replace-datanode-on-failure.policy=NEVER】
	 */
	@Test
	public void  appendByAPI() throws IOException{
		Configuration conf = new Configuration();
		conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); //修改属性参数
		FileSystem fs = FileSystem.get(conf);
		Path file = new Path("/spaceQuota/hello.txt");
		FSDataOutputStream out = fs.append(file);
		out.writeChars("aaaa");
		out.close();
	}









】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇使用IDEA 搭建 spark on yarn 的.. 下一篇HDFS Shell 常用命令

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目