设为首页 加入收藏

TOP

Spark对Hbase 的封装 connector
2019-02-19 13:44:22 】 浏览:337
Tags:Spark Hbase 封装 connector

传统方式spark写Hbase的方式为这种方式就是常用的TableInputFormat和TableOutputFormat来读写hbase;本文用SparkOnHbase基于GIT上Clouder开源出来的方法,依赖如下:

<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-spark</artifactId>
    <version>1.2.0-cdh5.12.1</version>
</dependency>

另外hortonworks-spark 也有spark和Hbase的connector大家可自行编译但是感觉没有Clouder好用
https://github.com/hortonworks-spark/shc#apache-spark—apache-hbase-connector

华为也有spark-Hbase的封装 https://github.com/Huawei-Spark/Spark-SQL-on-HBase spark-hbase


  val spark=SparkSession.builder().appName("").master("").getOrCreate()
  val sc=spark.sparkContext
  val conf= HBaseConfiguration.create()
  val habsecontext=new HBaseContext(sc,conf)


  def scanHbaseTB(tableName:String)(implicit startKey:Option[String],endKey:Option[String]):RDD[(ImmutableBytesWritable,Result)]={
  //如果有StartRowKey根据提供查询
    startKey match {
      case Some(x)=>{
        val scan=new Scan()
        scan.setStartRow(Bytes.toBytes(x))
        scan.setStopRow(Bytes.toBytes(endKey.getOrElse(x)))
        val hbaeRDD=habsecontext.hbaseRDD(TableName.valueOf(tableName),scan)
        hbaeRDD
      }
      case None=>{
        val scan=new Scan()
        val hbaeRDD=habsecontext.hbaseRDD(TableName.valueOf(tableName),scan)
        hbaeRDD
      }
    }


    def main(args: Array[String]): Unit = {
      //传统方式
      conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey")
      conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey")
      conf.set(TableInputFormat.INPUT_TABLE, "SparkHbase")
      val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])



      //利用HbaseContext进行操作
      val SparkHbaseRDD=scanHbaseTB("SparkHbase")
      SparkHbaseRDD.foreach(x=>{
        val rowKey=x._1.toString
        val rs=x._2
        val cell=rs.getColumnLatestCell(Bytes.toBytes(""),Bytes.toBytes(""))
        println(s"the rowKey is $rowKey the values is $cell")
      })


    }


  }
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇传统的行存储和(HBase)列存储的.. 下一篇Hbase优化之Region分割设置的问题

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目