TOP

java实现对HDFS增删改查(CRUD)等操作
2018-11-22 16:21:39 】 浏览:120
Tags:java 实现 HDFS 删改 CRUD 操作

实现对HDFS增删改查CRUD等操作

1 查找

列出某个目录下的文件名称,hdfs命令如下所示:

hdfs dfs –ls/usr/app

java代码片段:

  1. publicvoidlist(StringsrcPath){
  2. Configurationconf=newConfiguration();
  3. LOG.info("[Defaultfs]:"+conf.get("fs.default.name"));
  4. conf.set("hadoop.job.ugi","app,app");//Itisnotnecessaryforthedefaultuser.
  5. FileSystemfs;
  6. try{
  7. fs=FileSystem.get(conf);
  8. RemoteIterator<LocatedFileStatus>rmIterator=fs.listLocatedStatus(newPath(srcPath));
  9. while(rmIterator.hasNext()){
  10. Pathpath=rmIterator.next().getPath();
  11. if(fs.isDirectory(path)){
  12. LOG.info("-----------DirectoryName:"+path.getName());
  13. }
  14. elseif(fs.isFile(path)){
  15. LOG.info("-----------FileName:"+path.getName());
  16. }
  17. }
  18. }catch(IOExceptione){
  19. LOG.error("listfileSysetmobjectstream.:",e);
  20. newRuntimeException(e);
  21. }
  22. }


输出结果:

2014-03-11 22:38:15,329 INFO (com.hdfs.client.SyncDFS:48) ------------File Name: README.txt

2014-03-11 22:38:15,331 INFO (com.hdfs.client.SyncDFS:45) ------------Directory Name: blog_blogpost

2014-03-11 22:38:15,333 INFO (com.hdfs.client.SyncDFS:45) ------------Directory Name: test

读取文件中的内容,hdfs命令如下:

hdfs dfs –cat /input

java 代码:

  1. publicvoidreadFile(Stringfile){
  2. Configurationconf=newConfiguration();
  3. FileSystemfs;
  4. try{
  5. fs=FileSystem.get(conf);
  6. Pathpath=newPath(file);
  7. if(!fs.exists(path)){
  8. LOG.warn("file'"+file+"'doesn'texist!");
  9. return;
  10. }
  11. FSDataInputStreamin=fs.open(path);
  12. Stringfilename=file.substring(file.lastIndexOf('/')+1,file.length());
  13. OutputStreamout=newBufferedOutputStream(newFileOutputStream(
  14. newFile(filename)));
  15. byte[]b=newbyte[1024];
  16. intnumBytes=0;
  17. while((numBytes=in.read(b))>0){
  18. out.write(b,0,numBytes);
  19. }
  20. in.close();
  21. out.close();
  22. fs.close();
  23. }catch(IOExceptione){
  24. LOG.error("ifExistsfsExceptioncaught!:",e);
  25. newRuntimeException(e);
  26. }
  27. }


获取文件的修改时间,java代码:

  1. /**
  2. *Getstheinformationaboutthefilemodifiedtime.
  3. *@paramsource
  4. *@throwsIOException
  5. */
  6. publicvoidgetModificationTime(Stringsource)throwsIOException{
  7. Configurationconf=newConfiguration();
  8. FileSystemfs=FileSystem.get(conf);
  9. PathsrcPath=newPath(source);
  10. //Checkifthefilealreadyexists
  11. if(!(fs.exists(srcPath))){
  12. System.out.println("Nosuchdestination"+srcPath);
  13. return;
  14. }
  15. //Getthefilenameoutofthefilepath
  16. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  17. FileStatusfileStatus=fs.getFileStatus(srcPath);
  18. longmodificationTime=fileStatus.getModificationTime();
  19. LOG.info("modifieddatetime:"+System.out.format("File%s;Modificationtime:%0.2f%n",filename,modificationTime));
  20. }


获取文件块定位信息,java代码:

  1. /**
  2. *Getsthefileblocklocationinfo
  3. *@paramsource
  4. *@throwsIOException
  5. */
  6. publicvoidgetBlockLocations(Stringsource)throwsIOException{
  7. Configurationconf=newConfiguration();
  8. FileSystemfs=FileSystem.get(conf);
  9. PathsrcPath=newPath(source);
  10. //Checkifthefilealreadyexists
  11. if(!(ifExists(source))){
  12. System.out.println("Nosuchdestination"+srcPath);
  13. return;
  14. }
  15. //Getthefilenameoutofthefilepath
  16. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  17. FileStatusfileStatus=fs.getFileStatus(srcPath);
  18. BlockLocation[]blkLocations=fs.getFileBlockLocations(fileStatus,0,fileStatus.getLen());
  19. intblkCount=blkLocations.length;
  20. System.out.println("File:"+filename+"storedat:");
  21. for(inti=0;i<blkCount;i++){
  22. String[]hosts=blkLocations[i].getHosts();
  23. LOG.info("hostip:"+System.out.format("Host%d:%s%n",i,hosts));
  24. }
  25. }


获取Hadoop集群中data node的DNS主机名,java代码:

  1. publicvoidgetHostnames()throwsIOException{
  2. Configurationconfig=newConfiguration();
  3. FileSystemfs=FileSystem.get(config);
  4. DistributedFileSystemhdfs=(DistributedFileSystem)fs;
  5. DatanodeInfo[]dataNodeStats=hdfs.getDataNodeStats();
  6. String[]names=newString[dataNodeStats.length];
  7. for(inti=0;i<dataNodeStats.length;i++){
  8. names[i]=dataNodeStats[i].getHostName();
  9. LOG.info("datenodehostname:"+(dataNodeStats[i].getHostName()));
  10. }
  11. }


2 创建

创建一个目录,指定具体的文件路径。hdfs命令如下:

  1. hdfsdfs–mkdir/usr/app/tmp


java代码:

  1. publicvoidmkdir(Stringdir){
  2. Configurationconf=newConfiguration();
  3. FileSystemfs=null;
  4. try{
  5. fs=FileSystem.get(conf);
  6. Pathpath=newPath(dir);
  7. if(!fs.exists(path)){
  8. fs.mkdirs(path);
  9. LOG.debug("createdirectory'"+dir+"'successfully!");
  10. }else{
  11. LOG.debug("directory'"+dir+"'exits!");
  12. }
  13. }catch(IOExceptione){
  14. LOG.error("FileSystemgetconfigurationwithanerror");
  15. e.printStackTrace();
  16. }finally{
  17. if(fs!=null){
  18. try{
  19. fs.close();
  20. }catch(IOExceptione){
  21. LOG.error("closefsobjectstream.:",e);
  22. newRuntimeException(e);
  23. }
  24. }
  25. }
  26. }

将本地文件上传到hdfs上去,java代码如下:

  1. publicvoidcopyFromLocal(Stringsource,Stringdest){
  2. Configurationconf=newConfiguration();
  3. FileSystemfs;
  4. try{
  5. fs=FileSystem.get(conf);
  6. PathsrcPath=newPath(source);
  7. PathdstPath=newPath(dest);
  8. //Checkifthefilealreadyexists
  9. if(!(fs.exists(dstPath))){
  10. LOG.warn("dstPathpathdoesn'texist");
  11. LOG.error("Nosuchdestination"+dstPath);
  12. return;
  13. }
  14. //Getthefilenameoutofthefilepath
  15. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  16. try{
  17. //ifthefileexistsinthedestinationpath,itwillthrowexception.
  18. //fs.copyFromLocalFile(srcPath,dstPath);
  19. //removeandoverwritefileswiththemethod
  20. //copyFromLocalFile(booleandelSrc,booleanoverwrite,Pathsrc,Pathdst)
  21. fs.copyFromLocalFile(false,true,srcPath,dstPath);
  22. LOG.info("File"+filename+"copiedto"+dest);
  23. }catch(Exceptione){
  24. LOG.error("copyFromLocalFileexceptioncaught!:",e);
  25. newRuntimeException(e);
  26. }finally{
  27. fs.close();
  28. }
  29. }catch(IOExceptione1){
  30. LOG.error("copyFromLocalIOExceptionobjectstream.:",e1);
  31. newRuntimeException(e1);
  32. }
  33. }


添加一个文件到指定的目录下,java代码如下:

  1. publicvoidaddFile(Stringsource,Stringdest){
  2. //ConfobjectwillreadtheHDFSconfigurationparameters
  3. Configurationconf=newConfiguration();
  4. FileSystemfs;
  5. try{
  6. fs=FileSystem.get(conf);
  7. //Getthefilenameoutofthefilepath
  8. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  9. //Createthedestinationpathincludingthefilename.
  10. if(dest.charAt(dest.length()-1)!='/'){
  11. dest=dest+"/"+filename;
  12. }else{
  13. dest=dest+filename;
  14. }
  15. //Checkifthefilealreadyexists
  16. Pathpath=newPath(dest);
  17. if(fs.exists(path)){
  18. LOG.error("File"+dest+"alreadyexists");
  19. return;
  20. }
  21. //Createanewfileandwritedatatoit.
  22. FSDataOutputStreamout=fs.create(path);
  23. InputStreamin=newBufferedInputStream(newFileInputStream(
  24. newFile(source)));
  25. byte[]b=newbyte[1024];
  26. intnumBytes=0;
  27. //Inthiswayreadandwritedatatodestinationfile.
  28. while((numBytes=in.read(b))>0){
  29. out.write(b,0,numBytes);
  30. }
  31. in.close();
  32. out.close();
  33. fs.close();
  34. }catch(IOExceptione){
  35. LOG.error("addFileExceptioncaught!:",e);
  36. newRuntimeException(e);
  37. }
  38. }


3 修改

重新命名hdfs中的文件名称,java代码如下:

  1. publicvoidrenameFile(Stringfromthis,Stringtothis){
  2. Configurationconf=newConfiguration();
  3. FileSystemfs;
  4. try{
  5. fs=FileSystem.get(conf);
  6. PathfromPath=newPath(fromthis);
  7. PathtoPath=newPath(tothis);
  8. if(!(fs.exists(fromPath))){
  9. LOG.info("Nosuchdestination"+fromPath);
  10. return;
  11. }
  12. if(fs.exists(toPath)){
  13. LOG.info("Alreadyexists!"+toPath);
  14. return;
  15. }
  16. try{
  17. booleanisRenamed=fs.rename(fromPath,toPath);//renamesfilenameindeed.
  18. if(isRenamed){
  19. LOG.info("Renamedfrom"+fromthis+"to"+tothis);
  20. }
  21. }catch(Exceptione){
  22. LOG.error("renameFileExceptioncaught!:",e);
  23. newRuntimeException(e);
  24. }finally{
  25. fs.close();
  26. }
  27. }catch(IOExceptione1){
  28. LOG.error("fsExceptioncaught!:",e1);
  29. newRuntimeException(e1);
  30. }
  31. }

4 删除

在hdfs上,删除指定的一个文件。Java代码:

  1. publicvoiddeleteFile(Stringfile){
  2. Configurationconf=newConfiguration();
  3. FileSystemfs;
  4. try{
  5. fs=FileSystem.get(conf);
  6. Pathpath=newPath(file);
  7. if(!fs.exists(path)){
  8. LOG.info("File"+file+"doesnotexists");
  9. return;
  10. }
  11. /*
  12. *recursivelydeletethefile(s)ifitisadirectory.
  13. *Ifyouwanttomarkthepaththatwillbedeletedas
  14. *aresultofclosingtheFileSystem.
  15. *deleteOnExit(Pathf)
  16. */
  17. fs.delete(newPath(file),true);
  18. fs.close();
  19. }catch(IOExceptione){
  20. LOG.error("deleteFileExceptioncaught!:",e);
  21. newRuntimeException(e);
  22. }
  23. }


Appendix 完整代码

  1. importjava.io.BufferedInputStream;
  2. importjava.io.BufferedOutputStream;
  3. importjava.io.File;
  4. importjava.io.FileInputStream;
  5. importjava.io.FileOutputStream;
  6. importjava.io.IOException;
  7. importjava.io.InputStream;
  8. importjava.io.OutputStream;
  9. importorg.apache.commons.logging.Log;
  10. importorg.apache.commons.logging.LogFactory;
  11. importorg.apache.hadoop.conf.Configuration;
  12. importorg.apache.hadoop.fs.BlockLocation;
  13. importorg.apache.hadoop.fs.FSDataInputStream;
  14. importorg.apache.hadoop.fs.FSDataOutputStream;
  15. importorg.apache.hadoop.fs.FileStatus;
  16. importorg.apache.hadoop.fs.FileSystem;
  17. importorg.apache.hadoop.fs.LocatedFileStatus;
  18. importorg.apache.hadoop.fs.Path;
  19. importorg.apache.hadoop.fs.RemoteIterator;
  20. importorg.apache.hadoop.hdfs.DistributedFileSystem;
  21. importorg.apache.hadoop.hdfs.protocol.DatanodeInfo;
  22. publicclassSyncDFS{
  23. privatestaticfinalLogLOG=LogFactory.getLog(SyncDFS.class);
  24. /**
  25. *Readsthedirectoryname(s)andfilename(s)fromthespecifiedparameter"srcPath"
  26. *@paramsrcPath
  27. */
  28. publicvoidlist(StringsrcPath){
  29. Configurationconf=newConfiguration();
  30. LOG.info("[Defaultfs]:"+conf.get("fs.default.name"));
  31. //conf.set("hadoop.job.ugi","app,app");//Itisnotnecessaryforthedefaultuser.
  32. FileSystemfs;
  33. try{
  34. fs=FileSystem.get(conf);
  35. RemoteIterator<LocatedFileStatus>rmIterator=fs.listLocatedStatus(newPath(srcPath));
  36. while(rmIterator.hasNext()){
  37. Pathpath=rmIterator.next().getPath();
  38. if(fs.isDirectory(path)){
  39. LOG.info("-----------DirectoryName:"+path.getName());
  40. }
  41. elseif(fs.isFile(path)){
  42. LOG.info("-----------FileName:"+path.getName());
  43. }
  44. }
  45. }catch(IOExceptione){
  46. LOG.error("listfileSysetmobjectstream.:",e);
  47. newRuntimeException(e);
  48. }
  49. }
  50. /**
  51. *Makesthespecifieddirectoryifitdoesn'texist.
  52. *@paramdir
  53. */
  54. publicvoidmkdir(Stringdir){
  55. Configurationconf=newConfiguration();
  56. FileSystemfs=null;
  57. try{
  58. fs=FileSystem.get(conf);
  59. Pathpath=newPath(dir);
  60. if(!fs.exists(path)){
  61. fs.mkdirs(path);
  62. LOG.debug("createdirectory'"+dir+"'successfully!");
  63. }else{
  64. LOG.debug("directory'"+dir+"'exits!");
  65. }
  66. }catch(IOExceptione){
  67. LOG.error("FileSystemgetconfigurationwithanerror");
  68. e.printStackTrace();
  69. }finally{
  70. if(fs!=null){
  71. try{
  72. fs.close();
  73. }catch(IOExceptione){
  74. LOG.error("closefsobjectstream.:",e);
  75. newRuntimeException(e);
  76. }
  77. }
  78. }
  79. }
  80. /**
  81. *Readsthefilecontentinconsole.
  82. *@paramfile
  83. */
  84. publicvoidreadFile(Stringfile){
  85. Configurationconf=newConfiguration();
  86. FileSystemfs;
  87. try{
  88. fs=FileSystem.get(conf);
  89. Pathpath=newPath(file);
  90. if(!fs.exists(path)){
  91. LOG.warn("file'"+file+"'doesn'texist!");
  92. return;
  93. }
  94. FSDataInputStreamin=fs.open(path);
  95. Stringfilename=file.substring(file.lastIndexOf('/')+1,file.length());
  96. OutputStreamout=newBufferedOutputStream(newFileOutputStream(
  97. newFile(filename)));
  98. byte[]b=newbyte[1024];
  99. intnumBytes=0;
  100. while((numBytes=in.read(b))>0){
  101. out.write(b,0,numBytes);
  102. }
  103. in.close();
  104. out.close();
  105. fs.close();
  106. }catch(IOExceptione){
  107. LOG.error("ifExistsfsExceptioncaught!:",e);
  108. newRuntimeException(e);
  109. }
  110. }
  111. publicbooleanifExists(Stringsource){
  112. if(source==null||source.length()==0){
  113. returnfalse;
  114. }
  115. Configurationconf=newConfiguration();
  116. FileSystemfs=null;
  117. try{
  118. fs=FileSystem.get(conf);
  119. LOG.debug("judgefile'"+source+"'");
  120. returnfs.exists(newPath(source));
  121. }catch(IOExceptione){
  122. LOG.error("ifExistsfsExceptioncaught!:",e);
  123. newRuntimeException(e);
  124. returnfalse;
  125. }finally{
  126. if(fs!=null){
  127. try{
  128. fs.close();
  129. }catch(IOExceptione){
  130. LOG.error("fs.closeExceptioncaught!:",e);
  131. newRuntimeException(e);
  132. }
  133. }
  134. }
  135. }
  136. /**
  137. *RecursivelycopiesthesourcepathdirectoriesorfilestothedestinationpathofDFS.
  138. *Itisthesamefunctionalityasthefollowingcomand:
  139. *hadoopfs-copyFromLocal<localfs><hadoopfs>
  140. *@paramsource
  141. *@paramdest
  142. */
  143. publicvoidcopyFromLocal(Stringsource,Stringdest){
  144. Configurationconf=newConfiguration();
  145. FileSystemfs;
  146. try{
  147. fs=FileSystem.get(conf);
  148. PathsrcPath=newPath(source);
  149. PathdstPath=newPath(dest);
  150. //Checkifthefilealreadyexists
  151. if(!(fs.exists(dstPath))){
  152. LOG.warn("dstPathpathdoesn'texist");
  153. LOG.error("Nosuchdestination"+dstPath);
  154. return;
  155. }
  156. //Getthefilenameoutofthefilepath
  157. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  158. try{
  159. //ifthefileexistsinthedestinationpath,itwillthrowexception.
  160. //fs.copyFromLocalFile(srcPath,dstPath);
  161. //removeandoverwritefileswiththemethod
  162. //copyFromLocalFile(booleandelSrc,booleanoverwrite,Pathsrc,Pathdst)
  163. fs.copyFromLocalFile(false,true,srcPath,dstPath);
  164. LOG.info("File"+filename+"copiedto"+dest);
  165. }catch(Exceptione){
  166. LOG.error("copyFromLocalFileexceptioncaught!:",e);
  167. newRuntimeException(e);
  168. }finally{
  169. fs.close();
  170. }
  171. }catch(IOExceptione1){
  172. LOG.error("copyFromLocalIOExceptionobjectstream.:",e1);
  173. newRuntimeException(e1);
  174. }
  175. }
  176. publicvoidrenameFile(Stringfromthis,Stringtothis){
  177. Configurationconf=newConfiguration();
  178. FileSystemfs;
  179. try{
  180. fs=FileSystem.get(conf);
  181. PathfromPath=newPath(fromthis);
  182. PathtoPath=newPath(tothis);
  183. if(!(fs.exists(fromPath))){
  184. LOG.info("Nosuchdestination"+fromPath);
  185. return;
  186. }
  187. if(fs.exists(toPath)){
  188. LOG.info("Alreadyexists!"+toPath);
  189. return;
  190. }
  191. try{
  192. booleanisRenamed=fs.rename(fromPath,toPath);//renamesfilenameindeed.
  193. if(isRenamed){
  194. LOG.info("Renamedfrom"+fromthis+"to"+tothis);
  195. }
  196. }catch(Exceptione){
  197. LOG.error("renameFileExceptioncaught!:",e);
  198. newRuntimeException(e);
  199. }finally{
  200. fs.close();
  201. }
  202. }catch(IOExceptione1){
  203. LOG.error("fsExceptioncaught!:",e1);
  204. newRuntimeException(e1);
  205. }
  206. }
  207. /**
  208. *UploadsoraddsafiletoHDFS
  209. *@paramsource
  210. *@paramdest
  211. */
  212. publicvoidaddFile(Stringsource,Stringdest){
  213. //ConfobjectwillreadtheHDFSconfigurationparameters
  214. Configurationconf=newConfiguration();
  215. FileSystemfs;
  216. try{
  217. fs=FileSystem.get(conf);
  218. //Getthefilenameoutofthefilepath
  219. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  220. //Createthedestinationpathincludingthefilename.
  221. if(dest.charAt(dest.length()-1)!='/'){
  222. dest=dest+"/"+filename;
  223. }else{
  224. dest=dest+filename;
  225. }
  226. //Checkifthefilealreadyexists
  227. Pathpath=newPath(dest);
  228. if(fs.exists(path)){
  229. LOG.error("File"+dest+"alreadyexists");
  230. return;
  231. }
  232. //Createanewfileandwritedatatoit.
  233. FSDataOutputStreamout=fs.create(path);
  234. InputStreamin=newBufferedInputStream(newFileInputStream(
  235. newFile(source)));
  236. byte[]b=newbyte[1024];
  237. intnumBytes=0;
  238. //Inthiswayreadandwritedatatodestinationfile.
  239. while((numBytes=in.read(b))>0){
  240. out.write(b,0,numBytes);
  241. }
  242. in.close();
  243. out.close();
  244. fs.close();
  245. }catch(IOExceptione){
  246. LOG.error("addFileExceptioncaught!:",e);
  247. newRuntimeException(e);
  248. }
  249. }
  250. /**
  251. *Deletesthefilesifitisadirectory.
  252. *@paramfile
  253. */
  254. publicvoiddeleteFile(Stringfile){
  255. Configurationconf=newConfiguration();
  256. FileSystemfs;
  257. try{
  258. fs=FileSystem.get(conf);
  259. Pathpath=newPath(file);
  260. if(!fs.exists(path)){
  261. LOG.info("File"+file+"doesnotexists");
  262. return;
  263. }
  264. /*
  265. *recursivelydeletethefile(s)ifitisadirectory.
  266. *Ifyouwanttomarkthepaththatwillbedeletedas
  267. *aresultofclosingtheFileSystem.
  268. *deleteOnExit(Pathf)
  269. */
  270. fs.delete(newPath(file),true);
  271. fs.close();
  272. }catch(IOExceptione){
  273. LOG.error("deleteFileExceptioncaught!:",e);
  274. newRuntimeException(e);
  275. }
  276. }
  277. /**
  278. *Getstheinformationaboutthefilemodifiedtime.
  279. *@paramsource
  280. *@throwsIOException
  281. */
  282. publicvoidgetModificationTime(Stringsource)throwsIOException{
  283. Configurationconf=newConfiguration();
  284. FileSystemfs=FileSystem.get(conf);
  285. PathsrcPath=newPath(source);
  286. //Checkifthefilealreadyexists
  287. if(!(fs.exists(srcPath))){
  288. System.out.println("Nosuchdestination"+srcPath);
  289. return;
  290. }
  291. //Getthefilenameoutofthefilepath
  292. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  293. FileStatusfileStatus=fs.getFileStatus(srcPath);
  294. longmodificationTime=fileStatus.getModificationTime();
  295. LOG.info("modifieddatetime:"+System.out.format("File%s;Modificationtime:%0.2f%n",filename,modificationTime));
  296. }
  297. /**
  298. *Getsthefileblocklocationinfo
  299. *@paramsource
  300. *@throwsIOException
  301. */
  302. publicvoidgetBlockLocations(Stringsource)throwsIOException{
  303. Configurationconf=newConfiguration();
  304. FileSystemfs=FileSystem.get(conf);
  305. PathsrcPath=newPath(source);
  306. //Checkifthefilealreadyexists
  307. if(!(ifExists(source))){
  308. System.out.println("Nosuchdestination"+srcPath);
  309. return;
  310. }
  311. //Getthefilenameoutofthefilepath
  312. Stringfilename=source.substring(source.lastIndexOf('/')+1,source.length());
  313. FileStatusfileStatus=fs.getFileStatus(srcPath);
  314. BlockLocation[]blkLocations=fs.getFileBlockLocations(fileStatus,0,fileStatus.getLen());
  315. intblkCount=blkLocations.length;
  316. System.out.println("File:"+filename+"storedat:");
  317. for(inti=0;i<blkCount;i++){
  318. String[]hosts=blkLocations[i].getHosts();
  319. LOG.info("hostip:"+System.out.format("Host%d:%s%n",i,hosts));
  320. }
  321. }
  322. publicvoidgetHostnames()throwsIOException{
  323. Configurationconfig=newConfiguration();
  324. FileSystemfs=FileSystem.get(config);
  325. DistributedFileSystemhdfs=(DistributedFileSystem)fs;
  326. DatanodeInfo[]dataNodeStats=hdfs.getDataNodeStats();
  327. String[]names=newString[dataNodeStats.length];
  328. for(inti=0;i<dataNodeStats.length;i++){
  329. names[i]=dataNodeStats[i].getHostName();
  330. LOG.info("datenodehostname:"+(dataNodeStats[i].getHostName()));
  331. }
  332. }
  333. /**
  334. *@paramargs
  335. */
  336. publicstaticvoidmain(String[]args){
  337. SyncDFSdfs=newSyncDFS();
  338. dfs.list("/user/app");
  339. dfs.mkdir("/user/app");
  340. //dfs.readFile("/user/app/README.txt");
  341. LOG.info("--------------"+
  342. dfs.ifExists("/user/warehouse/hbase.db/u_data/u.data"));//false
  343. LOG.info("--------------"+dfs.ifExists("/user/app/README.txt"));//true
  344. //copiedthelocalfile(s)tothedfs.
  345. //dfs.copyFromLocal("/opt/test","/user/app");
  346. //deletethefile(s)fromthedfs
  347. //dfs.deleteFile("/user/app/test");
  348. //renamediretoryindfs
  349. //dfs.renameFile("/user/app/test","/user/app/log");
  350. //renamefileindfs
  351. //dfs.renameFile("/user/app/log/derby.log","/user/app/log/derby_info.log");
  352. }
  353. }

java实现对HDFS增删改查(CRUD)等操作 https://www.cppentry.com/bencandy.php?fid=115&id=185189

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HDFS查看异常:Operation categor.. 下一篇Hadoop DataNode 无法启动,报错..