设为首页 加入收藏

TOP

使用spark将数据写入Hbase
2019-02-15 01:44:31 】 浏览:75
Tags:使用 spark 数据 写入 Hbase
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012719230/article/details/81699987
--------------组装xml并捕获异常-------------------
package wondersgroup_0628.com

import java.io.{IOException, PrintWriter, StringReader, StringWriter}
import java.util.Base64

import com.wonders.TXmltmp
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, Put}
import org.apache.hadoop.hbase.mapred.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.xml.sax.{InputSource, SAXException}

object TestTest_3 {
  def main(args: Array[String]): Unit = {
    val saprkConf = new SparkConf().setAppName("TextTeset_3")
    val sc = new SparkContext(saprkConf)
//    val dataText = "/user/hdfs/test/rdd_1000000.dat"
    val rdd = sc.textFile(args(0))
    val data = rdd.map(_.split("\\|\\|")).map{x=>(x(0),x(1),x(2))}
    val result = data.foreachPartition{x => {
      val conf= HBaseConfiguration.create()
      conf.set(TableInputFormat.COLUMN_LIST,"hbaseTest");
      conf.set("hbase.zookeeper.quorum","qsmaster,qsslave1,qsslave2");
      conf.set("hbase.zookeeper.property.clientPort","2181");
//      conf.addResource("/home/hadoop/data/lib/hbase-site.xml");
      val table = new HTable(conf,"hbaseTest");
      table.setAutoFlush(false,false);
      table.setWriteBufferSize(5*1024*1024);
      x.foreach{y => {
        try {
        val tmp = new TXmltmp
        val j1 = new String( Base64.getDecoder.decode(y ._1) )
        val j2 = new String( Base64.getDecoder.decode(y ._2))
          val xml = tmp.load(j1, j2)

        import javax.xml.parsers.DocumentBuilderFactory
        val foctory = DocumentBuilderFactory.newInstance
        val builder = foctory.newDocumentBuilder
        val buil = builder.parse(new InputSource( new StringReader(xml)))
        var put= new Put(Bytes.toBytes(y._3));
        put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("age"), Bytes.toBytes(xml))
        table.put(put);table.flushCommits
          }
      catch {
        case ex: SAXException=>
        case ex: IOException=>
          println("found a unknown exception"+ ex)
          val sw:StringWriter = new StringWriter()
          val pw:PrintWriter = new PrintWriter(sw)
          ex.printStackTrace(pw)
          val error = sw.getBuffer
          sw.close()
          pw.close()
          var put= new Put(Bytes.toBytes(y._3));
        put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("name"), Bytes.toBytes(error.toString))
        table.put(put);table.flushCommits}
      }}
      }}
    sc.stop()}
  }

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase Coprocessors机制 下一篇华为HBase调优

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目