object testWriteResultToHbase{
def main(args:Array[String]){
val kafkaservers=args(0)
val topic=args(1)
val zookeeperservers=args(2)
val tablename=args(3)
val spark=SparkSession
.builder
.appName("testWriteResultToHbase")
.master("local")
.getOrCreate()
import spark.implicits._
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaservers)
.option("subscribe", topic)
.load()
.selectExpr("cast(topic as String) ","cast(key as String)","CAST(value AS STRING)")
.as[(String,String,String)]
lines.createTempView("Originalkafka")
import spark.sql
val count=sql("select value from Originalkafka ")
val query =count.writeStream
.outputMode("append")
.foreach(new ForeachWriter[Row]{
var connection: Connection= null
def open(partitionId:Long,version:Long):Boolean={
true
}
def process(record:Row):Unit={
val conf = new HBaseConfiguration()
conf.set("hbase.zookeeper.quorum",zookeeperservers)
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent","/hbase-unsecure")
val table = new HTable(conf, tablename)
val theput= new Put(Bytes.toBytes(record.mkString))
theput.add(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes("30"))
table.put(theput)
}
def close(errorOrNull:Throwable):Unit={
}
})
.queryName("test")
.format("foreach")
.start()
query.awaitTermination()
}