版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wzq294328238/article/details/45099835
引言
Apache HBase v1.0 发布了,这是 HBase 一个主要的里程碑。
值得我们注意的是,hbase1.0推出了全新的 API 以及重新组织客户端 API,被标注deprecated的api将于2.0版本去除。
本文内容大部分基于Hbase源码官方文档及网络博客
User API Reference
HBase Refguide
所有代码基于spark1.3、hbase1.0和hadoop2.6
环境配置
首先操作hbase需要如下jar包:
1. guava- 12.0 .1 . jar
2. hbase-client - 1.0 .0 . jar
3. hbase-common - 1.0 .0 . jar
4. hbase-prefix -tree - 1.0 .0 . jar
5. hbase-protocol - 1.0 .0 . jar
6. hbase-server - 1.0 .0 . jar
7. htrace-core - 3.1 .0 -incubating . jar
凑齐这七颗jar包我们就可以操作hbase了。
首先,将这些jar包及文件放在同一的目录中
然后,在spark-default中设置
spark.driver.extraClassPath=(jar包目录,就是你当前系统文件系统目录,不是hdfs)
当然如果是集群运行的话还要配置:
spark.executor.extraClassPath=(jar包目录,注意要保证你的每台executor机器上都存在这个目录,不然会报错)
下面是我的设置:
spark.executor.extraClassPath=/home/hadoop/wzq_workspace/lib /*
spark.driver.extraClassPath=/home/hadoop/wzq_workspace/lib /*
这样做的好处是,不用每次提交任务都要–jars,感觉很麻烦。
spark-shell中小试牛刀
配置好上面的几个步骤后,用你喜欢的三个方式之一(local,standalone,yarn)打开spark-shell都可以。
键入:
import org.apache .hadoop .hbase .{HBaseConfiguration, TableName}
import org.apache .hadoop .hbase .client ._
如果成功import的话,说明你的classpath配置成功了。接下来我们来连接hbase。
val myConf = HBaseConfiguration.create ()
myConf.set ("hbase.zookeeper.quorum" ,"master.hadoop,slave1.hadoop,slave2.hadoop" )
根据hbase架构,设置zookeeper就可以找到master的地址了。
或者网上有人说把hbase-site.xml放到classpath中,不然程序获取不到集群的信息。但是这样还是觉得麻烦些。
我这里是设置了三节点zookeeper,改成你集群上课用的zookeeper域名就可以了。
最后就是见证奇迹的时刻:(获取hbase连接)
val connect = ConnectionFactory.createConnection(myConf)
如果返回如下结果就说明成功了:
res2: org.apache .hadoop .hbase .client .Connection = hconnection-0x3d3b59a4
下面开始用API来操作hbase
创建表
import org.apache .hadoop .hbase ._
import org.apache .hadoop .hbase .client ._
val myConf = HBaseConfiguration.create ()
//标注1
myConf.set ("hbase.zookeeper.quorum" ,"master.hadoop,slave1.hadoop,slave2.hadoop" )
//利用工厂方法进行创建,使用RPC调用返回
val connection = ConnectionFactory.createConnection (myConf)
val admin = connection.getAdmin
try {
//创建命名空间
admin.createNamespace (NamespaceDescriptor.create ("wzq" ).build ())
val tableDesc = new HTableDescriptor(TableName.valueOf (tableName))
tableDesc.setDurability (Durability.FSYNC _WAL)
//创建两列列簇,分别是columFamliy和columCompany
val coluemFamliyDes = new HColumnDescriptor("columFamliy" )
val coluemCompanyDes = new HColumnDescriptor("columCompany" )
tableDesc.addFamily (coluemFamliyDes)
admin.createTable (tableDesc)
}finally{
admin.close ()
connection.close ()
}
虽说用api可以创建表,但个人还是推荐用hbase shell来进行。
从表中读数据到RDD
首先:
import org.apache .hadoop .hbase .{HBaseConfiguration, TableName}
import org.apache .hadoop .hbase .client ._
import org.apache .hadoop .hbase .mapreduce .TableInputFormat
接下来:
val conf = HBaseConfiguration.create ()
conf.set ("hbase.zookeeper.property.clientPort" , "2181" )
conf.set ("hbase.zookeeper.quorum" ,"master.hadoop,slave1.hadoop,slave2.hadoop" )
//我们还可以制定范围
conf.set (TableInputFormat.SCAN _ROW_START, "000000" )
conf.set (TableInputFormat.SCAN _ROW_STOP, "666666" )
val rdd = sc.newAPIHadoopRDD (conf,classOf[TableInputFormat],classOf[org.apache .hadoop .hbase .io .ImmutableBytesWritable ],classOf[org.apache .hadoop .hbase .client .Result ])
rdd.count
将RDD批量插入HBase
首先配置:
val conf = HBaseConfiguration.create ()
conf.set ("hbase.zookeeper.property.clientPort" , "2181" )
conf.set ("hbase.zookeeper.quorum" , "master.hadoop,slave1.hadoop,slave2.hadoop" )
其次创建一个JobConf
val jobConf = new JobConf(conf, this.getClass )
jobConf.setOutputFormat (classOf[TableOutputFormat])
jobConf.set (TableOutputFormat.OUTPUT _TABLE, table)
接下来就需要将RDD转化为RDD[(ImmutableBytesWritable,org.apache.hadoop.hbase.client.Put)]
val rdd = sc.textFile("*" )
val insertRDD = rdd.map(data =>{
val put = new Put (Bytes .tpBytes ("Key "))
put .addColumn (*)
******
put .addColumn (*)
put .setDurability (Durability .ASYNC_WAL )
(new ImmutableBytesWritable ,put )
} )
接下来我们就可以调用saveAsHadoopDataset方法了
insertRDD.saveAsHadoopDataset (jobConf)
增删改查
//获得表
val yourTable = conn.getTable (Table)
//插入数据,指定键值
val p = new Put("0001" .getBytes )
//添加插入的内容,制定列簇,和列名以及内容
p.addColumn ("Columfamliy" .getBytes ,"Column" .getBytes , "content" .getBytes )
//提交
yourTable.put (p)
//查询数据
val g = new Get("0001" .getBytes )
val result = yourTable.get (g)
//指定想要查询的列簇列名
val value = Bytes.toString (result.getValue ("Columfamliy" .getBytes ,"Column" .getBytes ))
println("content of 0001 :" +value)
//扫描数据
val s = new Scan()
s.addColumn ("Columfamliy" .getBytes ,"Column" .getBytes )
val scanner = yourTable.getScanner (s)
for(r <- scanner){
println("value: " + Bytes.toString (
r.getValue ("Columfamliy" .getBytes ,"Column" .getBytes )))
}
scanner.close ()
//删除数据
val d = new Delete("0001" .getBytes )
d.addColumn ("Columfamliy" .getBytes ,"Column" .getBytes )
yourTable.delete (d)
//关闭连接
if(table != null) yourTable.close ()
conn.close ()