1.通过java.net.URL类访问写入HDFS数据
/*
* 通过java.net.URL类访问写入HDFS数据
* 结论:通过URL的方式不能实现对HDFS的写操作,抛java.net.UnknownServiceException:
protocol (协议)doesn't support output
*/
@Test
public void writeByURL() throws Exception {
URL _url =new URL("hdfs://master1:9000/spaceQuota/hello.txt");
URLConnection conn = _url.openConnection();
OutputStream out = conn.getOutputStream();
out.write("hello world".getBytes()); //向集群文件 写hello world
out.close();
}
2.通过FileSystem API做write操作
/*
* 通过FileSystem API做写操作
*/
@Test
public void writeByAPI() throws IOException{
Configuration conf = new Configuration();
FileSystem fs =FileSystem.get(conf);
Path file = new Path("/spaceQuota/hello.txt");
FSDataOutputStream out = fs.create(file);
out.write("hello world".getBytes());
out.close();
}
3.通过FileSystem API做写操作,动态设置相关参数:replication为2和blocksize为10字节
-----------------------------------------------------------------------------------------------
/**
* 通过FileSystem API做写操作,动态设置相关参数:replication和blocksize
* 有一些参数集群生效,例如:dfs.namenode.fs-limits.min-block-size=10
* 注:加载config信息的优先级:
* 代码级-->{classPath/***-site.xml}-->{HOADOOP_HOME/etc/haddop/***-site.xml}
*/
@Test
public void writeByAPIForBlocksize() throws IOException{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master1:9000");
//conf.set("dfs.bytes-per-checksum", "10");//设置校验和为10
FileSystem fs =FileSystem.get(conf);
Path file = new Path("/spaceQuota/hello6666.txt");
FSDataOutputStream out = fs.create(file, true, 4096,(short)2, 10);
//fs.create(f, overwrite, bufferSize, replication, blockSize)
out.write("hello world".getBytes());
out.close();
}
问题:出现如下错误时:
org.apache.hadoop.HadoopIllegalArgumentException:
Invalid values: dfs.bytes-per-checksum (=512) must divide block size (=10).
原因是:block大小设置为10时,相应的dfs.bytes-per-checksum 也要重新设置
解决:
conf.set("dfs.bytes-per-checksum", "10");//设置校验和为10
4.seek操作
/*
* 通过FileSystem API做read操作, 设置位置信息seek()
* 总结:FSDataInputStream:可执行Seekable(),
* 而FSDataOutputStream没有,
*/
@Test
public void seekByAPI() throws IOException{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(conf);
Path file=new Path("/spaceQuota/hello.txt");
FSDataInputStream in=fs.open(file);
IOUtils.copyBytes(in,System.out,4096,false);//注意:流不能关闭
in.seek(0);//使输入指针回到0,相当于又读了一遍到控制台
IOUtils.copyBytes(in,System.out,4096,true);
}
结果:
5.测试一致模型 :描述文件读/写的数据可见性
---------------------------------------------------------------------------------------
/**
* 通过读写,测试文件系统的一致模型
*/
@Test
public void writeByAPIText() throws IOException{
Configuration conf = new Configuration();
conf.set("dfs.bytes-per-checksum", "10");
FileSystem fs =FileSystem.get(conf);
Path file = new Path("/spaceQuota/hello.txt");
//创建文件时,文件立即可见
FSDataOutputStream out = fs.create(file,true,4096,(short)2,10);
out.write("hello world1a".getBytes());
out.flush();
out.write("hello world2a".getBytes());
out.hflush();
out.write("hello world3a".getBytes());
out.hsync();
out.close();
}
@Test
public void readByAPIText() throws IOException{
Configuration conf = new Configuration();
FileSystem fs =FileSystem.get(conf);
Path file = new Path("/spaceQuota/hello.txt");
FSDataInputStream in = fs.open(file);
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] b = new byte[1024];
int bytesRead;
while ((bytesRead = in.read(b))!=-1) {
out.write(b, 0, bytesRead);
}
System.out.println(new String(out.toByteArray()));
// IOUtils.copyBytes(in, System.out, 4096,true);
}
6.append追加操作
---------------------------------------------------------------------------------------------
/*
* 注意:如集群节点少于3个,会抛异常;解决方案修改
【dfs.client.block.write.replace-datanode-on-failure.policy=NEVER】
*/
@Test
public void appendByAPI() throws IOException{
Configuration conf = new Configuration();
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); //修改属性参数
FileSystem fs = FileSystem.get(conf);
Path file = new Path("/spaceQuota/hello.txt");
FSDataOutputStream out = fs.append(file);
out.writeChars("aaaa");
out.close();
}