设为首页 加入收藏

TOP

Spark 操作Hbase 对表的操作:增删改查   scala
2019-01-06 13:51:53 】 浏览:47
Tags:Spark 操作 Hbase 删改   scala

在build.sbt中配置依赖(行之间需要空格)

ame := "test2"


scalaVersion := "2.10.4"


libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core" % "1.0.0",
"org.apache.hbase" % "hbase" % "1.2.1-hadoop1",
"org.apache.hbase" % "hbase-client" % "1.2.1-hadoop1",
"org.apache.hbase" % "hbase-common" % "1.2.1-hadoop1",
"org.apache.hbase" % "hbase-server" % "1.2.1-hadoop1"
)


version := "1.0"


在环境变量中配置该工程需要的JAR包,命名为SPARK_TEST_JAR。

————————————————————————————————————————

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.spark._
/**
* Created by gongxuan on 2/3/15.
* hadoop1.2.1 scala2.10.4 hbase0.98.9 spark1.0.0
*
*/
object HBaseTest {
def main(args: Array[String]) {
//create table test_table1
var table_name = "test1"
val conf = HBaseConfiguration.create
val admin = new HBaseAdmin(conf)
if (admin.tableExists(table_name))
{
admin.disableTable(table_name)
admin.deleteTable(table_name)
}
val htd = new HTableDescriptor(table_name)
val hcd = new HColumnDescriptor("id")
//add column to table
htd.addFamily(hcd)
admin.createTable(htd)


//put data to HBase table
val tablename = htd.getName
val table = new HTable(conf, tablename)
val databytes = Bytes.toBytes("id")
for (c <- 1 to

10) {
val row = Bytes.toBytes("row" + c.toString)
val p1 = new Put(row)
p1.add(databytes, Bytes.toBytes(c.toString), Bytes.toBytes("value" + c.toString))
table.put(p1)
}
for (c <- 1 to 10) {
val g = new Get(Bytes.toBytes("row" + c.toString))
println("Get:" + table.get(g))
}


//search table
val config = HBaseConfiguration.create
val sc = new SparkContext("local", "HBaseTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))


config.set(TableInputFormat.INPUT_TABLE,table_name)

// 用hadoopAPI创建一个RDD

val hbaseRDD = sc.newAPIHadoopRDD(config, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])


val count = hbaseRDD.count()
println("HbaseRDD Count:" + count)

hbaseRDD.cache()

//找到result对象 ,返回类型Array[(org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result)]

//key是一个不变的ImmutableBytesWritablevalueHbaseResult

//res(0)._2返回第二个参数result

val res = hbaseRDD.take(count.toInt)
for (j <- 1 to count.toInt) {

var rs = res(j - 1)._2

//遍历res.raw取出每一个单元的值 ,返回类型:Array[org.apache.hadoop.hbase.KeyValue]

var kvs = rs.raw
for (kv <- kvs) //再遍历每一个单元里面的记录
println("row:" + new String(kv.getRow()) +
" cf:" + new String(kv.getFamily()) +
" column:" + new String(kv.getQualifier()) +
" value:" + new String(kv.getValue()))
}
//drop table
admin.disableTable(table_name)
admin.deleteTable(table_name)
}
}

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark 操作Hbase 对表的操作:.. 下一篇[HBase进阶]--HBase最佳实践之HBa..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }