设为首页 加入收藏

TOP

sparkstreaming实时写入Hbase(saveAsNewAPIHadoopDataset方法)
2019-03-15 01:44:21 】 浏览:202
Tags:sparkstreaming 实时 写入 Hbase saveAsNewAPIHadoopDataset 方法
版权声明:原创文章,转载请注明出处 https://blog.csdn.net/xianpanjia4616/article/details/85301998

之前的博客中也写过sparkstreaming批量写入hbase,今天主要介绍一下用新的API写入Hbase,直接看代码吧:

package hbase

import kafka.PropertiesScalaUtils
import net.sf.json.JSONObject
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import spark.wordcount.kafkaStreams

/**
  * sparkstreaming写入hbase新的API;
  */
object sparkToHbase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Hbase Test")
    val scc = new StreamingContext(conf, Seconds(1))
    val sc = scc.sparkContext
    val tablename = "test"
    val mode = args(0).toString
    val zk_hbase = PropertiesScalaUtils.loadProperties("zk_hbase",mode)
    val zk_port = PropertiesScalaUtils.loadProperties("zk_port",mode)
    val hbase_master = PropertiesScalaUtils.loadProperties("hbase_master",mode)
    val hbase_rootdir = PropertiesScalaUtils.loadProperties("hbase_rootdir",mode)
  
		    

val zookeeper_znode_parent = PropertiesScalaUtils.loadProperties("zookeeper_znode_parent",mode) val topic = PropertiesScalaUtils.loadProperties("topic_combine",mode) val broker = PropertiesScalaUtils.loadProperties("broker",mode) sc.hadoopConfiguration.set("hbase.zookeeper.quorum",zk_hbase) sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", zk_port) sc.hadoopConfiguration.set("hbase.master", hbase_master) sc.hadoopConfiguration.set("hbase.defaults.for.version.skip", "true") sc.hadoopConfiguration.set("hhbase.rootdir", hbase_rootdir) sc.hadoopConfiguration.set("zookeeper.znode.parent", zookeeper_znode_parent) sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename) val job = Job.getInstance(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val topicSet = Set(topic) val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "latest", //latest;earliest "value.deserializer" -> classOf[StringDeserializer] //key,value的反序列化; , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> broker , "group.id" -> "jason_test" , "enable.auto.commit" -> (true: java.lang.Boolean) ) kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) try { kafkaStreams.foreachRDD(rdd => { if(!rdd.isEmpty()){ val save_rdd = rdd.map(x => { val json = JSONObject.fromObject(x.value()) val put = new Put(Bytes.toBytes(json.get("rowkey").toString)) insert_hb(json,put) (new ImmutableBytesWritable, put) }) save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }) }catch { case e:Exception => println("报错了") } scc.start() scc.awaitTermination() } def insert_hb(json: JSONObject, onePut: Put): Unit = { val keys = json.keySet val iterator_redis = keys.iterator while (iterator_redis.hasNext) { val hb_col = iterator_redis.next().toString val col_value = json.get(hb_col).toString onePut.addColumn(Bytes.toBytes("f1"), Bytes.toBytes(hb_col), Bytes.toBytes(col_value)) } } }

如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase的rowkey排序和scan输出顺序 下一篇【HBase基础教程】7、HBase之读取..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }