--------------组装xml并捕获异常-------------------
package wondersgroup_0628.com
import java.io.{IOException, PrintWriter, StringReader, StringWriter}
import java.util.Base64
import com.wonders.TXmltmp
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, Put}
import org.apache.hadoop.hbase.mapred.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.xml.sax.{InputSource, SAXException}
object TestTest_3 {
def main(args: Array[String]): Unit = {
val saprkConf = new SparkConf().setAppName("TextTeset_3")
val sc = new SparkContext(saprkConf)
// val dataText = "/user/hdfs/test/rdd_1000000.dat"
val rdd = sc.textFile(args(0))
val data = rdd.map(_.split("\\|\\|")).map{x=>(x(0),x(1),x(2))}
val result = data.foreachPartition{x => {
val conf= HBaseConfiguration.create()
conf.set(TableInputFormat.COLUMN_LIST,"hbaseTest");
conf.set("hbase.zookeeper.quorum","qsmaster,qsslave1,qsslave2");
conf.set("hbase.zookeeper.property.clientPort","2181");
// conf.addResource("/home/hadoop/data/lib/hbase-site.xml");
val table = new HTable(conf,"hbaseTest");
table.setAutoFlush(false,false);
table.setWriteBufferSize(5*1024*1024);
x.foreach{y => {
try {
val tmp = new TXmltmp
val j1 = new String( Base64.getDecoder.decode(y ._1) )
val j2 = new String( Base64.getDecoder.decode(y ._2))
val xml = tmp.load(j1, j2)
import javax.xml.parsers.DocumentBuilderFactory
val foctory = DocumentBuilderFactory.newInstance
val builder = foctory.newDocumentBuilder
val buil = builder.parse(new InputSource( new StringReader(xml)))
var put= new Put(Bytes.toBytes(y._3));
put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("age"), Bytes.toBytes(xml))
table.put(put);table.flushCommits
}
catch {
case ex: SAXException=>
case ex: IOException=>
println("found a unknown exception"+ ex)
val sw:StringWriter = new StringWriter()
val pw:PrintWriter = new PrintWriter(sw)
ex.printStackTrace(pw)
val error = sw.getBuffer
sw.close()
pw.close()
var put= new Put(Bytes.toBytes(y._3));
put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("name"), Bytes.toBytes(error.toString))
table.put(put);table.flushCommits}
}}
}}
sc.stop()}
}