设为首页 加入收藏

TOP

spark streaming插入hbase
2018-12-07 01:45:03 】 浏览:44
Tags:spark streaming 插入 hbase
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/ggz631047367/article/details/50807840
import java.sql.{DriverManager, ResultSet}

import org.apache.spark._
import org.apache.spark.streaming._

import scala.util.Random

import org.apache.hadoop.hbase.{HTableDescriptor,HColumnDescriptor,HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put,Table}



object Pi {

  val user="root"
  val password = "root"
  val host="10.8.8.123"
  val database="db_1"
  val port=3306
  val conn_str = "jdbc:mysql://"+host +":"+
		    

port+"/"+database val tablename="achi" val cf="a" val qulified="name" def CreatTableIfNotFind(conn:Connection,userTable:TableName): Unit ={ //从Connection获得 Admin 对象(相当于以前的 HAdmin) val admin=conn.getAdmin if(admin.tableExists(userTable)){ println("Table exists!") //admin.disableTable(userTable) //admin.deleteTable(userTable) //exit() }else{ val tableDesc=new HTableDescriptor(userTable) tableDesc.addFamily(new HColumnDescriptor(cf.getBytes)) admin.createTable(tableDesc) println("Create table success!") } } def InsertHbase(table:Table,cf:String,qulified:String,value:String): Unit ={ val p=new Put("id001".getBytes()) p.addColumn(cf.getBytes,qulified.getBytes,value.getBytes) table.put(p) } def main(args: Array[String]) { val conf=new SparkConf().setAppName("Spark Streaming").setMaster("local[2]") val ssc=new StreamingContext(conf,Seconds(3)) val lines=ssc.socketTextStream("localhost",9999) val words=lines.map(_.split('|')) words.print() words.foreachRDD{ rdd=>rdd.foreachPartition{ pa=> val conf=HBaseConfiguration.create() val conn=ConnectionFactory.createConnection(conf) val userTable=TableName.valueOf(tablename) val table=conn.getTable(userTable) pa.foreach{ w=> try{ var beg = System.currentTimeMillis() println(w(0)+w(1)) InsertHbase(table,cf,w(0),w(1)) println("***************************************************************") println(" 耗时: " + (System.currentTimeMillis() - beg)+"ms") println("***************************************************************") }catch{ case _:Exception=>println("raw error!") } } table.close() conn.close() } } ssc.start() ssc.awaitTermination() /* Class.forName("com.mysql.jdbc.Driver").newInstance(); val conn1 = DriverManager.getConnection(conn_str,user,password) try { val statement = conn1.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) val rs = statement.executeQuery("select * from achi limit 10") while (rs.next) { println(rs.getString(1)) } } catch { case _ : Exception => println("===>") } finally { conn1.close } */ } }
name := "untitled"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies++= Seq(
  "mysql" % "mysql-connector-java" % "5.1.38",
  "org.apache.spark" %% "spark-core" % "1.5.2",
  "org.apache.spark" %% "spark-streaming" % "1.5.2",
  "org.apache.hbase" % "hbase-client" % "1.1.3",
  "org.apache.hbase" % "hbase-common" % "1.1.3",
  "org.apache.hbase" % "hbase-server" % "1.1.3"
)


resolvers+="OS China" at "http://maven.oschina.net/content/groups/public/"

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇C# 通过Thrift 1 操作 HBase 下一篇通过HBase Observer同步数据到Ela..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }