TOP

使用python中的pyhdfs连接HDFS进行操作——pyhdfs使用指导(附代码及运行结果)
2018-11-29 16:19:33 】 浏览:1366
Tags:使用 python pyhdfs 连接 HDFS 进行 操作 指导 代码 运行 结果

【原创】pyhdfs使用指导——附代码及运行结果

码字不易,转载请注明出处,谢谢!

hdfs官方文档:http://pyhdfs.readthedocs.io/en/latest/pyhdfs.html#pyhdfs.HdfsClient

1.HdfsClient类

pyhdfs中的HdfsClient类非常关键。使用这个类可以实现连接HDFS的Namenode,对HDFS上的文件进行查询、读、写操作等。

In[1]:

import pyhdfs

class pyhdfs.HdfsClient(hosts=u'localhost', randomize_hosts=True, user_name=None, timeout=20, max_tries=2, retry_delay=5, requests_session=None, requests_kwargs=None)

参数解析:

  • hosts:主机名 IP地址与port号之间需要用","隔开 如:hosts="45.91.43.237,9000" 多个主机时可以传入list, 如:["47.95.45.254,9000","47.95.45.235,9000"]
  • randomize_hosts:随机选择host进行连接,默认为True
  • user_name:连接的Hadoop平台的用户名
  • timeout:每个Namenode节点连接等待的秒数,默认20sec
  • max_tries:每个Namenode节点尝试连接的次数,默认2次
  • retry_delay:在尝试连接一个Namenode节点失败后,尝试连接下一个Namenode的时间间隔,默认5sec
  • requests_session:连接HDFS的HTTP request请求使用的session,默认为None

In[0]:

# 代码示例
client = pyhdfs.HdfsClient(hosts="45.91.43.237,9000",user_name="hadoop")

2.返回这个用户的根目录

get_home_directory(**kwargs)

In[ 2 ]:

# 返回这个用户的根目录
print client.get_home_directory()

Out [ 2 ]:

/user/hadoop

ps:注意连接时需要修改本机host文件中的IP地址与主机名的映射,不然会报错。

具体解决方案在这里:https://blog.csdn.net/u010234516/article/details/52963954

3.返回可用的namenode节点

get_active_namenode(max_staleness=None)

In[3]:

# 返回可用的namenode节点
print client.get_active_namenode()

Out[3]:

45.91.43.237:50070

4.返回指定目录下的所有文件

listdir(path, **kwargs)

In[5]:

# 返回指定目录下的所有文件
print client.listdir("/user/hadoop")

Out[5]:

[u'.password', u'.sparkStaging', u'QuasiMonteCarlo_1525339502176_165201397', u'QuasiMonteCarlo_1525340283182_484907947', u'QuasiMonteCarlo_1525340542994_724956601', u'QuasiMonteCarlo_1525428514052_1305531458', u'QuasiMonteCarlo_1525428870962_320046470', u'QuasiMonteCarlo_1525429827638_1734729002', u'QuasiMonteCarlo_1525430442752_1819520486', u'QuasiMonteCarlo_1525430754280_1904667948', u'QuasiMonteCarlo_1525431222757_1446112904', u'QuasiMonteCarlo_1525431511572_67243213', u'QuasiMonteCarlo_1525437383596_1909178162', u'_sqoop', u'ceshi', u'exercise1.txt', u'exercise1map.py', u'exercise1reduce.py', u'speech_text.txt']

5.打开一个远程节点上的文件,返回一个HTTPResponse对象

open(path, **kwargs)

In[14]:

# 打开一个远程节点上的文件,返回一个HTTPResponse对象
response = client.open("/user/hadoop/speech_text.txt")

# 查看文件内容
response.read()

Out[14]:

'Fellow-citizens, being fully invested with that high office to which the partiality of my countrymen has called me, I now take an affectionate leave of you. You will bear with you to your homes the remembrance of the pledge I have this day given to discharge all the high duties of my exalted station according to the best of my ability, and I shall enter upon their performance with entire confidence in the support of a just and generous people.\n hello! there\'s a new message!\n\n hello! there\'s a new message!\n'

6.从本地上传文件至集群

copy_from_local(localsrc, dest, **kwargs)

7.从集群上copy到本地

copy_to_local(src, localdest, **kwargs)

In[15]:

# 从本地上传文件至集群之前,集群的目录
print "Before copy_from_local"
print client.listdir("/user/hadoop")

# 从本地上传文件至集群
client.copy_from_local("D:/Jupyter notebook/ipynb_materials/src/test.csv","/user/hadoop/test.csv")

# 从本地上传文件至集群之后,集群的目录
print "After copy_from_local"
print client.listdir("/user/hadoop")

Out[15]:

Before copy_from_local
[u'.password', u'.sparkStaging', u'QuasiMonteCarlo_1525339502176_165201397', u'QuasiMonteCarlo_1525340283182_484907947', u'QuasiMonteCarlo_1525340542994_724956601', u'QuasiMonteCarlo_1525428514052_1305531458', u'QuasiMonteCarlo_1525428870962_320046470', u'QuasiMonteCarlo_1525429827638_1734729002', u'QuasiMonteCarlo_1525430442752_1819520486', u'QuasiMonteCarlo_1525430754280_1904667948', u'QuasiMonteCarlo_1525431222757_1446112904', u'QuasiMonteCarlo_1525431511572_67243213', u'QuasiMonteCarlo_1525437383596_1909178162', u'_sqoop', u'ceshi', u'exercise1.txt', u'exercise1map.py', u'exercise1reduce.py', u'speech_text.txt']
After copy_from_local
[u'.password', u'.sparkStaging', u'QuasiMonteCarlo_1525339502176_165201397', u'QuasiMonteCarlo_1525340283182_484907947', u'QuasiMonteCarlo_1525340542994_724956601', u'QuasiMonteCarlo_1525428514052_1305531458', u'QuasiMonteCarlo_1525428870962_320046470', u'QuasiMonteCarlo_1525429827638_1734729002', u'QuasiMonteCarlo_1525430442752_1819520486', u'QuasiMonteCarlo_1525430754280_1904667948', u'QuasiMonteCarlo_1525431222757_1446112904', u'QuasiMonteCarlo_1525431511572_67243213', u'QuasiMonteCarlo_1525437383596_1909178162', u'_sqoop', u'ceshi', u'exercise1.txt', u'exercise1map.py', u'exercise1reduce.py', u'speech_text.txt', u'test.csv']

8.向一个已经存在的文件中插入文本

append(path, data, **kwargs)

9.融合两个文件

concat(target, sources, **kwargs)

In[16]:

# 向一个已经存在的文件中插入文本
# 先看看文件中的内容
response = client.open("/user/hadoop/test.csv")
response.read()

Out[16]:

'n,n+2,n*2\r\r\n0,2,0\r\r\n1,3,2\r\r\n2,4,4\r\r\n3,5,6\r\r\n4,6,8\r\r\n5,7,10\r\r\n6,8,12\r\r\n7,9,14\r\r\n8,10,16\r\r\n9,11,18\r\r\n'

In[17]:

# 使用append函数插入string
client.append("/user/hadoop/test.csv","0,2,0\r\r\n")

# 再看看文件中的内容
response = client.open("/user/hadoop/test.csv")
response.read()

Out[17]:

'n,n+2,n*2\r\r\n0,2,0\r\r\n1,3,2\r\r\n2,4,4\r\r\n3,5,6\r\r\n4,6,8\r\r\n5,7,10\r\r\n6,8,12\r\r\n7,9,14\r\r\n8,10,16\r\r\n9,11,18\r\r\n0,2,0\r\r\n'

10.创建新目录

mkdirs(path, **kwargs)

In[20]:

# 添加目录,先看看当前路径下的文件
client.listdir("/user/hadoop/")

Out[20]:

[u'.password',
 u'.sparkStaging',
 u'QuasiMonteCarlo_1525339502176_165201397',
 u'QuasiMonteCarlo_1525340283182_484907947',
 u'QuasiMonteCarlo_1525340542994_724956601',
 u'QuasiMonteCarlo_1525428514052_1305531458',
 u'QuasiMonteCarlo_1525428870962_320046470',
 u'QuasiMonteCarlo_1525429827638_1734729002',
 u'QuasiMonteCarlo_1525430442752_1819520486',
 u'QuasiMonteCarlo_1525430754280_1904667948',
 u'QuasiMonteCarlo_1525431222757_1446112904',
 u'QuasiMonteCarlo_1525431511572_67243213',
 u'QuasiMonteCarlo_1525437383596_1909178162',
 u'_sqoop',
 u'ceshi',
 u'exercise1.txt',
 u'exercise1map.py',
 u'exercise1reduce.py',
 u'speech_text.txt',
 u'test.csv']

In[22]:

# 创建新目录
client.mkdirs("/user/hadoop/data")

Out[22]:

True

In[23]:

# 再看看当前路径下的文件
# 多了个data路径
client.listdir("/user/hadoop/")

Out[23]:

[u'.password',
 u'.sparkStaging',
 u'QuasiMonteCarlo_1525339502176_165201397',
 u'QuasiMonteCarlo_1525340283182_484907947',
 u'QuasiMonteCarlo_1525340542994_724956601',
 u'QuasiMonteCarlo_1525428514052_1305531458',
 u'QuasiMonteCarlo_1525428870962_320046470',
 u'QuasiMonteCarlo_1525429827638_1734729002',
 u'QuasiMonteCarlo_1525430442752_1819520486',
 u'QuasiMonteCarlo_1525430754280_1904667948',
 u'QuasiMonteCarlo_1525431222757_1446112904',
 u'QuasiMonteCarlo_1525431511572_67243213',
 u'QuasiMonteCarlo_1525437383596_1909178162',
 u'_sqoop',
 u'ceshi',
 u'data',
 u'exercise1.txt',
 u'exercise1map.py',
 u'exercise1reduce.py',
 u'speech_text.txt',
 u'test.csv']

11.查看是否存在文件

exists(path, **kwargs)

In[29]:

# 查看文件是否存在
client.exists("/user/hadoop/test.csv")

Out[29]:

True

12.查看路径总览信息

get_content_summary(path, **kwargs)

In[28]:

# 查看路径总览信息
client.get_content_summary("/user/hadoop")

Out[28]:

ContentSummary(spaceQuota=-1, length=268497153, directoryCount=34, spaceConsumed=805491459, quota=-1, fileCount=98)

13.查看文件的校验和(checksum)

get_file_checksum(path, **kwargs)

In[27]:

# 查看文件的校验和(checksum)
client.get_file_checksum("/user/hadoop/test.csv")

Out[27]:

FileChecksum(length=28, bytes=u'0000020000000000000000009b79c1de3fbc34132510593a6073ecf500000000', algorithm=u'MD5-of-0MD5-of-512CRC32C')

14.查看当前路径的状态(可路径可文件)

list_status(path, **kwargs)

In[24]:

# 查看当前路径下的文件状态
client.list_status("/user/hadoop")

Out[24]:

[FileStatus(group=u'supergroup', permission=u'400', blockSize=134217728, accessTime=1532665989204L, pathSuffix=u'.password', modificationTime=1517972575373L, replication=3, length=4, childrenNum=0, owner=u'hadoop', storagePolicy=0, type=u'FILE', fileId=17768),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'.sparkStaging', modificationTime=1521528004629L, replication=0, length=0, childrenNum=4, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=26735),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525339502176_165201397', modificationTime=1525339503697L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=28309),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525340283182_484907947', modificationTime=1525341538004L, replication=0, length=0, childrenNum=2, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=28326),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525340542994_724956601', modificationTime=1525341600823L, replication=0, length=0, childrenNum=2, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=28343),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525428514052_1305531458', modificationTime=1525428515590L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=29623),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525428870962_320046470', modificationTime=1525428872502L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=29641),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525429827638_1734729002', modificationTime=1525429829220L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=29909),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525430442752_1819520486', modificationTime=1525430444346L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=29926),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525430754280_1904667948', modificationTime=1525430755899L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=29936),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525431222757_1446112904', modificationTime=1525431224390L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=30072),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525431511572_67243213', modificationTime=1525431513121L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=30089),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'QuasiMonteCarlo_1525437383596_1909178162', modificationTime=1525437385222L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=30099),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'_sqoop', modificationTime=1517981304673L, replication=0, length=0, childrenNum=1, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=18255),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'ceshi', modificationTime=1517977450123L, replication=0, length=0, childrenNum=2, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=17847),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=0, accessTime=0, pathSuffix=u'data', modificationTime=1532943534037L, replication=0, length=0, childrenNum=0, owner=u'hadoop', storagePolicy=0, type=u'DIRECTORY', fileId=34289),
 FileStatus(group=u'supergroup', permission=u'644', blockSize=134217728, accessTime=1529049559630L, pathSuffix=u'exercise1.txt', modificationTime=1529049559773L, replication=3, length=109, childrenNum=0, owner=u'jovyan', storagePolicy=0, type=u'FILE', fileId=33021),
 FileStatus(group=u'supergroup', permission=u'644', blockSize=134217728, accessTime=1529049596083L, pathSuffix=u'exercise1map.py', modificationTime=1529049596226L, replication=3, length=1063, childrenNum=0, owner=u'jovyan', storagePolicy=0, type=u'FILE', fileId=33022),
 FileStatus(group=u'supergroup', permission=u'644', blockSize=134217728, accessTime=1529049638764L, pathSuffix=u'exercise1reduce.py', modificationTime=1529049638904L, replication=3, length=456, childrenNum=0, owner=u'jovyan', storagePolicy=0, type=u'FILE', fileId=33023),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=134217728, accessTime=1532939667735L, pathSuffix=u'speech_text.txt', modificationTime=1532940839913L, replication=3, length=49827, childrenNum=0, owner=u'hadoop', storagePolicy=0, type=u'FILE', fileId=34287),
 FileStatus(group=u'supergroup', permission=u'755', blockSize=134217728, accessTime=1532943080708L, pathSuffix=u'test.csv', modificationTime=1532943291036L, replication=3, length=107, childrenNum=0, owner=u'hadoop', storagePolicy=0, type=u'FILE', fileId=34288)]

In[25]:

# 查看单个文件状态
client.list_status("/user/hadoop/test.csv")

Out[25]:

[FileStatus(group=u'supergroup', permission=u'755', blockSize=134217728, accessTime=1532943080708L, pathSuffix=u'', modificationTime=1532943291036L, replication=3, length=107, childrenNum=0, owner=u'hadoop', storagePolicy=0, type=u'FILE', fileId=34288)]

以上,就是pyhdfs的全部常用命令。感谢各位的阅读。


使用python中的pyhdfs连接HDFS进行操作——pyhdfs使用指导(附代码及运行结果) https://www.cppentry.com/bencandy.php?fid=115&id=191508

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇datax测试 读mysql 写hdfs 下一篇关于CDH页面下载HDFS文件地址解析..