TOP

sparkstreaming实时写入Hbase(saveAsNewAPIHadoopDataset方法)
2019-03-15 01:44:21 】 浏览:411
Tags:sparkstreaming 实时 写入 Hbase saveAsNewAPIHadoopDataset 方法

版权声明:原创文章,转载请注明出处 https://blog.csdn.net/xianpanjia4616/article/details/85301998

之前的博客中也写过sparkstreaming批量写入hbase,今天主要介绍一下用新的API写入Hbase,直接看代码吧:

package hbase

import kafka.PropertiesScalaUtils
import net.sf.json.JSONObject
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import spark.wordcount.kafkaStreams

/**
  * sparkstreaming写入hbase新的API;
  */
object sparkToHbase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Hbase Test")
    val scc = new StreamingContext(conf, Seconds(1))
    val sc = scc.sparkContext
    val tablename = "test"
    val mode = args(0).toString
    val zk_hbase = PropertiesScalaUtils.loadProperties("zk_hbase",mode)
    val zk_port = PropertiesScalaUtils.loadProperties("zk_port",mode)
    val hbase_master = PropertiesScalaUtils.loadProperties("hbase_master",mode)
    val hbase_rootdir = PropertiesScalaUtils.loadProperties("hbase_rootdir",mode)
    val zookeeper_znode_parent = PropertiesScalaUtils.loadProperties("zookeeper_znode_parent",mode)
    val topic = PropertiesScalaUtils.loadProperties("topic_combine",mode)
    val broker = PropertiesScalaUtils.loadProperties("broker",mode)
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum",zk_hbase)
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", zk_port)
    sc.hadoopConfiguration.set("hbase.master", hbase_master)
    sc.hadoopConfiguration.set("hbase.defaults.for.version.skip", "true")
    sc.hadoopConfiguration.set("hhbase.rootdir", hbase_rootdir)
    sc.hadoopConfiguration.set("zookeeper.znode.parent", zookeeper_znode_parent)
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    val topicSet = Set(topic)
    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest",   //latest;earliest
      "value.deserializer" -> classOf[StringDeserializer] //key,value的反序列化;
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> broker
      , "group.id" -> "jason_test"
      , "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    kafkaStreams = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
    try {
      kafkaStreams.foreachRDD(rdd => {
        if(!rdd.isEmpty()){
          val save_rdd = rdd.map(x => {
            val json = JSONObject.fromObject(x.value())
            val put = new Put(Bytes.toBytes(json.get("rowkey").toString))
            insert_hb(json,put)
            (new ImmutableBytesWritable, put)
          })
          save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
        }
      })
    }catch {
      case e:Exception => println("报错了")
    }
    scc.start()
    scc.awaitTermination()
  }
  def insert_hb(json: JSONObject, onePut: Put): Unit = {
    val keys = json.keySet
    val iterator_redis = keys.iterator
    while (iterator_redis.hasNext) {
      val hb_col = iterator_redis.next().toString
      val col_value = json.get(hb_col).toString
      onePut.addColumn(Bytes.toBytes("f1"), Bytes.toBytes(hb_col), Bytes.toBytes(col_value))
    }
  }
}

如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢


sparkstreaming实时写入Hbase(saveAsNewAPIHadoopDataset方法) https://www.cppentry.com/bencandy.php?fid=118&id=213199

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase的rowkey排序和scan输出顺序 下一篇【HBase基础教程】7、HBase之读取..