设为首页 加入收藏

TOP

6.Spark学习(Python版本):读写HBase数据库
2019-02-14 01:28:50 】 浏览:141
Tags:6.Spark 学习 Python 版本 读写 HBase 数据库
Step1. 创建一个HBase表
/usr/local/hadoop目录下启动hadoop:./sbin/start-dfs.sh
/usr/local/hbase目录下启动hbase:./bin/start-hbase.sh
/usr/local/hbase目录下启动hbase shell:./bin/hbase shell

在HBase数据库中,不需要创建数据库,只要直接创建表就可以。我们需要创建的表长下图这个样子:


1780773-62f49e231325f46e.png

create命令中,命令后面首先跟上表名称’student’,然后,再跟上列族名称’info’,这个列族’info’中包含三个列’name’,’gender’,’age’。

hbase> create 'student','info'

#在实际应用中,一般都是利用编程操作数据
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'

#查看全部数据
hbase> scan 'student'
ROW                   COLUMN+CELL                                               
 1                    column=info:age, timestamp=1533989802237, value=23        
 1                    column=info:gender, timestamp=1533989791899, value=F      
 1                    column=info:name, timestamp=1533989756823, value=Xueqian  
 2                    column=info:age, timestamp=1533989854387, value=24        
 2                    column=info:gender, timestamp=1533989840141, value=M      
 2                    column=info:name, timestamp=1533989825528, value=Weiliang 
2 row(s) in 0.0350 seconds

Step2.配置Spark
(1)把HBase的lib目录下的一些jar文件需要拷贝到Spark中:

所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar,可以打开一个终端按照以下命令来操作:

cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
(2)在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要我们另行下载:

spark-examples*.jar下载地址,下载好后将这个jar包放在/usr/local/spark/jars/hbase/

mkdir -p /usr/local/spark/jars/hbase/
mv ~/下载/spark-examples* 
(3)设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件:
cd /usr/local/spark/conf
vim spark-env.sh

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*

Step3. 编写程序读取HBase数据

读数据:

host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
hbase_rdd.collect()
 
1 {"qualifier" : "age", "timestamp" : "1512549772307", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}
{"qualifier" : "gender", "timestamp" : "1512549765192", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}
{"qualifier" : "name", "timestamp" : "1512549757406", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}
2 {"qualifier" : "age", "timestamp" : "1512549829145", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}
{"qualifier" : "gender", "timestamp" : "1512549790422", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}
{"qualifier" : "name", "timestamp" : "1512549780044", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}               

写数据:

host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
 
rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

在>hbase中再次查看student数据库看数据已经添加到表里。


1780773-e031f88daed32d2f.png
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇解决Hbase启动后,hmaster会在几.. 下一篇hbase单机版安装总结

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目