设为首页 加入收藏

TOP

kafkaToHbase
2018-12-05 18:37:47 】 浏览:23
Tags:kafkaToHbase
package kafka


import java.sql.{DriverManager}
import java.text.SimpleDateFormat
import java.util.{Properties, Calendar, Date}

import DAO.{ScalaHbase, ScalaConn}
import kafka.serializer.StringDecoder
import org.apache.commons.lang.time.DateFormatUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{
 
			
99d;">DataFrame
, SQLContext} import org.apache.hadoop.hbase.client.{Table, Connection,ConnectionFactory} import java.util.HashMap; import util.DateUtil import scala.util.Random import org.slf4j.LoggerFactory /** * Created by 谭志坚 on 2017/3/16. */ object kafkaToHbase { def LOG = LoggerFactory.getLogger(getClass) val dimension: Int = 5 case class market_analysis(product: String, source: String, trade_direction: String, trade_type: String, open_time: String,remark : String,price : String) def main (args : Array[String]) { //数据库连接信息 val url = "jdbc:mysql://12.168.55.218:3306/hadoop" val user = "root" val password = "dzwang**" //程序运行方式,yarn or local or spark standalone var masterUrl = "local[1]" if (args.length > 0) { masterUrl = args(0) } val sparkConf = new SparkConf().setMaster(masterUrl).setAppName("kafkaToHbase") val sc = new SparkContext(sparkConf) //每dimension * 60秒一个批次 val ssc = new StreamingContext(sc, Seconds(dimension * 60)) //从Kafka中读取数据,topic对应kafaka服务器的任务名称,该topic包含两个分区 val topics = Set("marketAnalysis") //kafaka服务器地址 // val brokers = "192.168.100.110:9092" val brokers = "192.168.100.110:9092,192.168.100.111:9092,192.168.100.112:9092" //kafaka的一些固定配置 val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "org.apache.kafka.common.serialization.ByteArraySerializer") props.put("bootstrap.servers", brokers) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") val producer: KafkaProducer[String,String] = new KafkaProducer[String, String](props) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "org.apache.kafka.common.serialization.ByteArraySerializer") val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)//.map(x => x._2.split(",")).map(w => market_analysis(w(0), w(1), w(2), w(3), w(4))) //日志以|~|为分隔符 val events = kafkaStream.flatMap(line => { Some(line._2.toString()) }) try { val tmpdf = events.map(_.split(",")).filter(_.length >= 5).map(x => market_analysis(x(0), x(1), x(2), x(3), x(4), x(5), x(6))) tmpdf.foreachRDD { rdd => //val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) // val sqlContext = new org.apache.spark.sql.SQLContext(sc) // import sqlContext.implicits._ val spark = ScalaConn.spark import spark.implicits._ val dataFrame = rdd.toDF() //注册为tempTable // dataFrame.registerTempTable("market_analysis") dataFrame.createOrReplaceTempView("market_analysis") //备份数据的sql // val bak_sql = sqlContext.sql("select product,source,trade_direction,trade_type,open_time,remark,price from market_analysis") val bak_sql = spark.sql("select product,source,trade_direction,trade_type,open_time,remark,price from market_analysis") //获取当前批次数据的最大时间和最小时间 val time_sql = spark.sql("select min(open_time),max(open_time) from market_analysis") if (bak_sql.collect().length > 0) { //#############################新增hbase或者修改hbase################################ var myConf:Configuration = ScalaConn.getHbaseConf var hbaseconn:Connection = ConnectionFactory.createConnection(myConf) // val familyColumn: Array[String] = Array[String]("USERS") ScalaHbase.createTable(myConf,hbaseconn, "dw_tradevolume_m5",null) val hTable: Table = hbaseconn.getTable(TableName.valueOf("dw_tradevolume_m5")) // val hTable = new HTable(myConf, TableName.valueOf("dw_tradevolume_m5")) // hTable.setAutoFlush(false, false) //关键点1 关闭自动提交 // hTable.setWriteBufferSize(3 * 1024 * 1024) //关键点2 缓存区 val conn: java.sql.Connection = DriverManager.getConnection(url, user, password) //统计时间并计算最终结果 countTime(time_sql,conn,hTable) //#############################原始数据备份mysql######################################## insertInitMysql(conn,bak_sql) //#############################原始数据备份HDFS(Hbase)######################################## insertInitHBase(myConf,bak_sql) //#############################关闭数据库连接######################################## conn.close() } } }catch { case e: Exception => } ssc.start() ssc.awaitTermination() } /** * 初始数据保存到hbase * @param myConf * @param bak_sql */ def insertInitHBase(myConf : Configuration,bak_sql : DataFrame): Unit ={ LOG.info("初始数据保存到hbase-----------------------------结束") val myTable = new HTable(myConf, TableName.valueOf("market_analysis")) myTable.setAutoFlush(false, false) //关键点1 关闭自动提交 myTable.setWriteBufferSize(3 * 1024 * 1024) //关键点2 缓存区 bak_sql.collect().foreach { y => { //时间年月日倒序 + 小时分钟 + 产品名称 var rowkey = new StringBuffer(DateFormatUtils.format(new Date, "yyyyMMdd")).reverse().toString + DateFormatUtils.format(new Date, "HHmm") + y(0).toString val p = new Put(Bytes.toBytes(rowkey)) p.add(Bytes.toBytes("info"), Bytes.toBytes("product"), Bytes.toBytes(y.get(0).toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("source"), Bytes.toBytes(y.get(1).toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("trade_direction"), Bytes.toBytes(y.get(2).toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("trade_type"), Bytes.toBytes(y.get(3).toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("open_time"), Bytes.toBytes(y.get(4).toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("remark"), Bytes.toBytes(y.get(5).toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("create_time"), Bytes.toBytes(DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss"))) p.add(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(y.get(6).toString)) myTable.put(p) }} myTable.flushCommits() //关键点3 提交 LOG.info("初始数据保存到HBASE-----------------------------结束") } /** * 初始数据保存到mysql * @param conn * @param bak_sql */ def insertInitMysql(conn: java.sql.Connection,bak_sql : DataFrame): Unit ={ LOG.info("初始数据保存到mysql-----------------------------开始") val pstatBackups = conn.prepareStatement("" + " INSERT INTO market_analysis ( " + " product," + " price," + " open_time," + " trade_type," + " trade_direction," + " source," + " remark," + " create_time) VALUES (,,,,,,,)") var product : String = "" var source : String = "" var trade_direction : String = "" var trade_type : String = "" var open_time : String = "" var price : String = "" var remark : String = "" bak_sql.collect().foreach(data => { LOG.info("循环开始------------------------------------") product = data.get(0).toString source = data.get(1).toString trade_direction = data.get(2).toString trade_type = data.get(3).toString open_time = data.get(4).toString//DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm") + ":00" remark = data.get(5).toString price = data.get(6).toString pstatBackups.setString(1, product) pstatBackups.setString(2, price) pstatBackups.setString(3, open_time) pstatBackups.setString(4, trade_type) pstatBackups.setString(5, trade_direction) pstatBackups.setString(6, source) pstatBackups.setString(7, remark) pstatBackups.setString(8, DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss")) pstatBackups.executeUpdate() }) LOG.info("初始数据保存到mysql-----------------------------结束") pstatBackups.close() } /** * 计算最终结果 * @param conn * * @param hTable * @param loopStartTime * @param loopEnd */ def countResult(conn: java.sql.Connection,hTable : org.apache.hadoop.hbase.client.Table, loopStartTime : String, loopEnd : String): Unit ={ LOG.info("计算最终结果-----------------------------结束") //计算统计结果的sql val sqlContext = ScalaConn.spark val sql = sqlContext.sql( " select product ," + " sum(IF(trade_direction=0 AND trade_type=0,1,0)) as openedBuy," + " sum(IF(trade_direction=1 AND trade_type=0,1,0)) as openedSell," + " sum(IF(trade_direction=0 AND trade_type=1,1,0)) as closedBuy," + " sum(IF(trade_direction=1 AND trade_type=1,1,0)) as closedSell" + " from market_analysis stm where open_time >= '" + loopStartTime + "' and open_time < '" + loopEnd + "' group by product") //查询 var query_ps = conn.prepareStatement("select count(1) from hlz_m5 where product= and theTime=") var insert_ps = conn.prepareStatement("insert into hlz_m5 (product,theTime,openBuy,openSell,closeBuy,closeSell,rOpenBuy,rOpenSell,rCloseBuy,rCloseSell) VALUES (,,,,,,,,,)") var update_ps = conn.prepareStatement("update hlz_m5 set openBuy=,openSell=,closeBuy=,closeSell=,rOpenBuy=,rOpenSell=,rCloseBuy=,rCloseSell= where product= and theTime=") sql.collect().foreach(data => { val product = data.get(0).toString val openBuy = Integer.parseInt(data.get(1).toString) val openSell = Integer.parseInt(data.get(2).toString) val closeBuy = Integer.parseInt(data.get(3).toString) val closeSell = Integer.parseInt(data.get(4).toString) // //先插入一条数据,然后根据LAST_INSERT_ID()获取最新ID,然后再修改rowkey // var rowkey = new StringBuffer(DateFormatUtils.format(new Date, "yyyyMMdd")).reverse().toString + DateFormatUtils.format(new Date, "HHmm") + product // //#############################新增统计结果到hbase################################ // val p = new Put(Bytes.toBytes(rowkey)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("product"), Bytes.toBytes(product)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("theTime"), Bytes.toBytes(loopStartTime)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("openBuy"), Bytes.toBytes(openBuy.toString)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("openSell"), Bytes.toBytes(openSell.toString)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("closeBuy"), Bytes.toBytes(closeBuy.toString)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("closeSell"), Bytes.toBytes(closeSell.toString)) // // //####################################此处的数据最后给用户看的####################################### // //在原来真实数据上加五十再乘以3 var tmp_openBuy = (openBuy + 50) * 3 var tmp_openSell = (openSell + 50) * 3 var tmp_closeBuy = (closeBuy + 50) * 3 var tmp_closeSell = (closeSell + 50) * 3 var random = new Random().nextInt(99) var rOpenBuy = tmp_openBuy + random var rOpenSell = tmp_openSell + random var rCloseBuy = tmp_closeBuy + random var rCloseSell = tmp_closeSell + random //计算随机数 // p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenBuy"), Bytes.toBytes(rOpenBuy.toString)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenSell"), Bytes.toBytes(rOpenSell.toString)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseBuy"), Bytes.toBytes(rCloseBuy.toString)) // p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseSell"), Bytes.toBytes(rCloseSell.toString)) // hTable.put(p) //************************统计数据存入mysql********************************* query_ps.setString(1,data.get(0).toString) query_ps.setString(2,loopStartTime) var rs = query_ps.executeQuery() var i = 0 while (rs.next()) { i = rs.getInt(1); } if (i > 0) { LOG.info("修改") update_ps.setInt(1, openBuy) update_ps.setInt(2, openSell) update_ps.setInt(3, closeBuy) update_ps.setInt(4, closeSell) update_ps.setInt(5,rOpenBuy) update_ps.setInt(6,rOpenSell) update_ps.setInt(7,rCloseBuy) update_ps.setInt(8,rCloseSell) update_ps.setString(9, product) update_ps.setString(10, loopStartTime) update_ps.executeUpdate() } else { LOG.info("新增") insert_ps.setString(1, product) insert_ps.setString(2, loopStartTime) insert_ps.setInt(3, openBuy) insert_ps.setInt(4, openSell) insert_ps.setInt(5, closeBuy) insert_ps.setInt(6, closeSell) insert_ps.setInt(7,rOpenBuy) insert_ps.setInt(8,rOpenSell) insert_ps.setInt(9,rCloseBuy) insert_ps.setInt(10,rCloseSell) insert_ps.executeUpdate() } }) sql.foreachPartition{ x => { x.foreach{ data => { val product = data.get(0).toString val openBuy = Integer.parseInt(data.get(1).toString) val openSell = Integer.parseInt(data.get(2).toString) val closeBuy = Integer.parseInt(data.get(3).toString) val closeSell = Integer.parseInt(data.get(4).toString) //先插入一条数据,然后根据LAST_INSERT_ID()获取最新ID,然后再修改rowkey var rowkey = new StringBuffer(DateFormatUtils.format(new Date, "yyyyMMdd")).reverse().toString + DateFormatUtils.format(new Date, "HHmm") + product //#############################新增统计结果到hbase################################ val p = new Put(Bytes.toBytes(rowkey)) p.add(Bytes.toBytes("info"), Bytes.toBytes("product"), Bytes.toBytes(product)) p.add(Bytes.toBytes("info"), Bytes.toBytes("theTime"), Bytes.toBytes(loopStartTime)) p.add(Bytes.toBytes("info"), Bytes.toBytes("openBuy"), Bytes.toBytes(openBuy.toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("openSell"), Bytes.toBytes(openSell.toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("closeBuy"), Bytes.toBytes(closeBuy.toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("closeSell"), Bytes.toBytes(closeSell.toString)) //####################################此处的数据最后给用户看的####################################### //在原来真实数据上加五十再乘以3 var tmp_openBuy = (openBuy + 50) * 3 var tmp_openSell = (openSell + 50) * 3 var tmp_closeBuy = (closeBuy + 50) * 3 var tmp_closeSell = (closeSell + 50) * 3 var random = new Random().nextInt(99) var rOpenBuy = tmp_openBuy + random var rOpenSell = tmp_openSell + random var rCloseBuy = tmp_closeBuy + random var rCloseSell = tmp_closeSell + random //计算随机数 p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenBuy"), Bytes.toBytes(rOpenBuy.toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenSell"), Bytes.toBytes(rOpenSell.toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseBuy"), Bytes.toBytes(rCloseBuy.toString)) p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseSell"), Bytes.toBytes(rCloseSell.toString)) hTable.put(p) } } } } //新增统计结果到hbase // hTable.flushCommits() //关键点3 提交 //关闭连接 query_ps.close() insert_ps.close() update_ps.close() LOG.info("计算最终结果-----------------------------结束") } /** * 循环计算最终结果 * @param time_sql * @param conn * * @param hTable */ def countTime(time_sql : DataFrame,conn: java.sql.Connection,hTable : org.apache.hadoop.hbase.client.Table): Unit ={ val sdf = new SimpleDateFormat("yyyy-MM-dd H:mm:ss") val calendar = Calendar.getInstance(); time_sql.collect().foreach(data => { val startDate: Date = sdf.parse(data(0).toString) val endDate: Date = sdf.parse(data(1).toString) calendar.setTime(startDate); var startYmd: String = DateFormatUtils.format(startDate, "yyyy-MM-dd") val startHour: Int = calendar.get(Calendar.HOUR_OF_DAY) val startMinute: Int = calendar.get(Calendar.MINUTE) calendar.setTime(endDate); var endYmd: String = DateFormatUtils.format(endDate, "yyyy-MM-dd") val endHour: Int = calendar.get(Calendar.HOUR_OF_DAY) val endMinute: Int = calendar.get(Calendar.MINUTE) LOG.info("startHour:" + startHour + ",startMinute:" + startMinute + ",endHour:" + endHour + ",endMinute:" + endMinute) var loopStartTime: String = "" var loopEndTime: String = "" var tmpEndYmd = startYmd //小时相同 if (endHour == startHour) { //2016-07-02 23:58:00 2016-07-03 00:10:00 start 2016-07-02 23:30:00 2016-07-02 24:00:00 2016-07-03 00:00:00 2016-07-03 00:30:00 //计算开始时间 loopStartTime = countStartTime(startMinute,startYmd,startHour) //如果开始时间和结束时间不同天 if(!startYmd.equals(endYmd)){ tmpEndYmd = endYmd } //计算结束时间 loopEndTime = countEndTime(endMinute,tmpEndYmd,startHour) }else { //计算开始时间 loopStartTime = countStartTime(startMinute,startYmd,startHour) //如果开始时间和结束时间不同天 if(!startYmd.equals(endYmd)){ tmpEndYmd = endYmd } //计算结束时间 loopEndTime = countEndTime(endMinute,tmpEndYmd,endHour) } LOG.info("loopStartTime:-----------" + loopStartTime + ",loopEndTime:-------------" + loopEndTime) var loopStart: Long = DateUtil.parseDate(loopStartTime).getTime val loopFlag: Long = DateUtil.parseDate(loopEndTime).getTime while (loopStart < loopFlag) { val loopEnd: String = DateUtil.parseString(new Date(loopStart + dimension * 60 * 1000)) LOG.info("loopStartTime:" + loopStartTime + "------------,loopEndTime:" + loopEnd) //查询 countResult(conn,hTable, loopStartTime, loopEnd) loopStart = loopStart + dimension * 60 * 1000 loopStartTime = DateUtil.parseString(new Date(loopStart)) } }) } def countStartTime(minute : Int,ymd : String,hour : Int): String ={ var loopTime : String = "" if (minute < dimension) { loopTime = ymd + " " + hour + ":00:00" } else if(minute > 5 && minute < 10){ loopTime = ymd + " " + hour + ":05:00" } else if(minute > 10 && minute < 15){ loopTime = ymd + " " + hour + ":10:00" } else if(minute >15 && minute < 20){ loopTime = ymd + " " + hour + ":15:00" } else if(minute >20 && minute < 25){ loopTime = ymd + " " + hour + ":20:00" } else if(minute >25 && minute < 30){ loopTime = ymd + " " + hour + ":25:00" } else if(minute > 30 && minute < 35){ loopTime = ymd + " " + hour + ":30:00" } else if(minute > 35 && minute < 40){ loopTime = ymd + " " + hour + ":35:00" } else if(minute > 40 && minute < 45){ loopTime = ymd + " " + hour + ":40:00" } else if(minute > 45 && minute < 50){ loopTime = ymd + " " + hour + ":45:00" } else if(minute > 50 && minute < 55){ loopTime = ymd + " " + hour + ":50:00" } else{ loopTime = ymd + " " + hour + ":55:00" } loopTime } def countEndTime(minute : Int,ymd : String,hour : Int): String ={ var loopTime : String = "" if (minute < dimension) { loopTime = ymd + " " + hour + ":05:00" } else if(minute > 5 && minute < 10){ loopTime = ymd + " " + hour + ":10:00" } else if(minute > 10 && minute < 15){ loopTime = ymd + " " + hour + ":15:00" } else if(minute >15 && minute < 20){ loopTime = ymd + " " + hour + ":20:00" } else if(minute >20 && minute < 25){ loopTime = ymd + " " + hour + ":25:00" } else if(minute >25 && minute < 30){ loopTime = ymd + " " + hour + ":30:00" } else if(minute > 30 && minute < 35){ loopTime = ymd + " " + hour + ":35:00" } else if(minute > 35 && minute < 40){ loopTime = ymd + " " + hour + ":40:00" } else if(minute > 40 && minute < 45){ loopTime = ymd + " " + hour + ":45:00" } else if(minute > 45 && minute < 50){ loopTime = ymd + " " + hour + ":50:00" } else if(minute > 50 && minute < 55){ loopTime = ymd + " " + hour + ":55:00" } else{ loopTime = ymd + " " + hour + ":00:00" val ca: Calendar = Calendar.getInstance ca.setTime(DateUtil.parseDate(loopTime)) ca.add(Calendar.HOUR_OF_DAY, 1) loopTime = DateUtil.parseString(ca.getTime) } loopTime } } /* object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }*/

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka's Metadata In ZooKeep.. 下一篇消息中间件Kafka Win10安装配置

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }