设为首页 加入收藏

TOP

spark-structstreaming-结果数据存入hbase
2018-12-22 13:49:20 】 浏览:134
Tags:spark-structstreaming- 结果 数据 存入 hbase
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_34969081/article/details/78983857

前言

本节描述通过spark-structstreaming将结果结果数据存入hbase

正文

object testWriteResultToHbase{
def main(args:Array[String]){

  val kafkaservers=args(0)
  val topic=args(1)     
  val zookeeperservers=args(2)
   val tablename=args(3)
   val spark=SparkSession
  .builder
  .appName("testWriteResultToHbase")
  .master("local")
  .getOrCreate()

  import spark.implicits._
   val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers",kafkaservers)
  .option("subscribe", topic)
  .load()
  .selectExpr("cast(topic as String) ","cast(key as String)","CAST(value AS STRING)")
  .as[(String,String,String)]
   lines.createTempView("Originalkafka")
  import spark.sql
  val count=sql("select value from Originalkafka ")
  val query =count.writeStream
  .outputMode("append")
  .foreach(new ForeachWriter[Row]{

    var connection: Connection= null
      def open(partitionId:Long,version:Long):Boolean={
        true
      }
      def process(record:Row):Unit={
    val conf = new HBaseConfiguration()
    conf.set("hbase.zookeeper.quorum",zookeeperservers)
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent","/hbase-unsecure")
          val table = new HTable(conf, tablename)
          val theput= new Put(Bytes.toBytes(record.mkString))
         theput.add(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes("30"))
         table.put(theput)
      }
      def close(errorOrNull:Throwable):Unit={
      }
  })
  .queryName("test")
  .format("foreach")      
  .start()
 query.awaitTermination()
 }

}

注意:写入hbase的时候,没有创建连接的客户端ConnectionFactory.createConnection(conf)
所以就没有必要在open(partitionId:Long,version:Long)close(errorOrNull:Throwable)打开连接和关闭连接。
但是本人认为还是有必要的。空了研究一下hbase 客户端源码

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop、spark、Hbase、Hive、hdf.. 下一篇Java 连接HBASE ,执行查询超时的..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目