设为首页 加入收藏

TOP

kafkaToHbase
2018-12-05 18:37:47 】 浏览:8
Tags:kafkaToHbase
package kafka


import java.sql.{DriverManager}
import java.text.SimpleDateFormat
import java.util.{Properties, Calendar, Date}

import DAO.{ScalaHbase, ScalaConn}
import kafka.serializer.StringDecoder
import org.apache.commons.lang.time.DateFormatUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.hadoop.hbase.client.{Table, Connection,ConnectionFactory}
import java.util.HashMap;
import util.DateUtil

import scala.util.Random
import org.slf4j.LoggerFactory

/**
 * Created by 谭志坚 on 2017/3/16.
 */
object kafkaToHbase {
  def LOG = LoggerFactory.getLogger(getClass)

    val dimension: Int = 5
    case class market_analysis(product: String, source: String, trade_direction: String, trade_type: String, open_time: String,remark : String,price : String)
    def main (args : Array[String]) {
      //数据库连接信息
      val url = "jdbc:mysql://12.168.55.218:3306/hadoop"
      val user = "root"
      val password = "dzwang**"
      //程序运行方式,yarn or local or spark standalone
      var masterUrl = "local[1]"
      if (args.length > 0) {
        masterUrl = args(0)
      }

      val sparkConf = new SparkConf().setMaster(masterUrl).setAppName("kafkaToHbase")
      val sc = new SparkContext(sparkConf)
      //每dimension * 60秒一个批次
      val ssc = new StreamingContext(sc, Seconds(dimension * 60))
      //从Kafka中读取数据,topic对应kafaka服务器的任务名称,该topic包含两个分区
      val topics = Set("marketAnalysis")
      //kafaka服务器地址
//      val brokers = "192.168.100.110:9092"
      val brokers = "192.168.100.110:9092,192.168.100.111:9092,192.168.100.112:9092"
      //kafaka的一些固定配置
      val props = new Properties()
      props.put("metadata.broker.list", brokers)
      props.put("serializer.class", "org.apache.kafka.common.serialization.ByteArraySerializer")
      props.put("bootstrap.servers", brokers)
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
      val producer: KafkaProducer[String,String] = new KafkaProducer[String, String](props)

      val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "org.apache.kafka.common.serialization.ByteArraySerializer")

      val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)//.map(x => x._2.split(",")).map(w => market_analysis(w(0), w(1), w(2), w(3), w(4))) //日志以|~|为分隔符

      val events = kafkaStream.flatMap(line => {
        Some(line._2.toString())
      })
      try {
        val tmpdf = events.map(_.split(",")).filter(_.length >= 5).map(x => market_analysis(x(0), x(1), x(2), x(3), x(4), x(5), x(6)))
        tmpdf.foreachRDD { rdd =>
          //val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
//          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//          import sqlContext.implicits._
          val spark =  ScalaConn.spark
          import spark.implicits._
          val dataFrame = rdd.toDF()
          //注册为tempTable
//          dataFrame.registerTempTable("market_analysis")
          dataFrame.createOrReplaceTempView("market_analysis")

          //备份数据的sql
//          val bak_sql = sqlContext.sql("select product,source,trade_direction,trade_type,open_time,remark,price from market_analysis")
          val bak_sql = spark.sql("select product,source,trade_direction,trade_type,open_time,remark,price from market_analysis")

          //获取当前批次数据的最大时间和最小时间
          val time_sql = spark.sql("select min(open_time),max(open_time) from market_analysis")

          if (bak_sql.collect().length > 0) {
            //#############################新增hbase或者修改hbase################################
            var myConf:Configuration  = ScalaConn.getHbaseConf
            var hbaseconn:Connection    =  ConnectionFactory.createConnection(myConf)

//            val familyColumn: Array[String] = Array[String]("USERS")
            ScalaHbase.createTable(myConf,hbaseconn, "dw_tradevolume_m5",null)
            val hTable: Table = hbaseconn.getTable(TableName.valueOf("dw_tradevolume_m5"))

//            val hTable = new HTable(myConf, TableName.valueOf("dw_tradevolume_m5"))
//            hTable.setAutoFlush(false, false) //关键点1    关闭自动提交
//            hTable.setWriteBufferSize(3 * 1024 * 1024) //关键点2    缓存区

            val conn: java.sql.Connection = DriverManager.getConnection(url, user, password)

            //统计时间并计算最终结果
            countTime(time_sql,conn,hTable)

            //#############################原始数据备份mysql########################################
            insertInitMysql(conn,bak_sql)

            //#############################原始数据备份HDFS(Hbase)########################################
            insertInitHBase(myConf,bak_sql)

            //#############################关闭数据库连接########################################
            conn.close()
          }

        }
      }catch {
        case e: Exception =>
      }
      ssc.start()
      ssc.awaitTermination()
    }

    /**
     * 初始数据保存到hbase
     * @param myConf
     * @param bak_sql
     */
    def insertInitHBase(myConf : Configuration,bak_sql : DataFrame): Unit ={
      LOG.info("初始数据保存到hbase-----------------------------结束")
      val myTable = new HTable(myConf, TableName.valueOf("market_analysis"))
      myTable.setAutoFlush(false, false) //关键点1    关闭自动提交
      myTable.setWriteBufferSize(3 * 1024 * 1024) //关键点2    缓存区
      bak_sql.collect().foreach { y => {
        //时间年月日倒序 + 小时分钟 + 产品名称
        var rowkey = new StringBuffer(DateFormatUtils.format(new Date, "yyyyMMdd")).reverse().toString + DateFormatUtils.format(new Date, "HHmm") + y(0).toString
        val p = new Put(Bytes.toBytes(rowkey))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("product"), Bytes.toBytes(y.get(0).toString))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("source"), Bytes.toBytes(y.get(1).toString))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("trade_direction"), Bytes.toBytes(y.get(2).toString))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("trade_type"), Bytes.toBytes(y.get(3).toString))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("open_time"), Bytes.toBytes(y.get(4).toString))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("remark"), Bytes.toBytes(y.get(5).toString))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("create_time"), Bytes.toBytes(DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss")))
        p.add(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(y.get(6).toString))
        myTable.put(p)
      }}
      myTable.flushCommits() //关键点3   提交
      LOG.info("初始数据保存到HBASE-----------------------------结束")
    }

    /**
     * 初始数据保存到mysql
     * @param conn
     * @param bak_sql
     */
    def insertInitMysql(conn: java.sql.Connection,bak_sql : DataFrame): Unit ={
      LOG.info("初始数据保存到mysql-----------------------------开始")
      val pstatBackups = conn.prepareStatement("" +
        "  INSERT INTO market_analysis (  " +
        "  product," +
        "  price," +
        "  open_time," +
        "  trade_type," +
        "  trade_direction," +
        "  source," +
        "  remark," +
        "  create_time) VALUES (,,,,,,,)")
      var product : String = ""
      var source : String = ""
      var trade_direction : String = ""
      var trade_type : String = ""
      var open_time : String = ""
      var price : String = ""
      var remark : String = ""
      bak_sql.collect().foreach(data => {
        LOG.info("循环开始------------------------------------")
        product = data.get(0).toString
        source = data.get(1).toString
        trade_direction = data.get(2).toString
        trade_type = data.get(3).toString
        open_time =  data.get(4).toString//DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm") + ":00"
        remark = data.get(5).toString
        price = data.get(6).toString
        pstatBackups.setString(1, product)
        pstatBackups.setString(2, price)
        pstatBackups.setString(3, open_time)
        pstatBackups.setString(4, trade_type)
        pstatBackups.setString(5, trade_direction)
        pstatBackups.setString(6, source)
        pstatBackups.setString(7, remark)
        pstatBackups.setString(8, DateFormatUtils.format(new Date, "yyyy-MM-dd HH:mm:ss"))
        pstatBackups.executeUpdate()
      })
      LOG.info("初始数据保存到mysql-----------------------------结束")
      pstatBackups.close()
    }

    /**
     * 计算最终结果
     * @param conn     *
     * @param hTable
     * @param loopStartTime
     * @param loopEnd
     */
    def countResult(conn: java.sql.Connection,hTable :  org.apache.hadoop.hbase.client.Table, loopStartTime : String, loopEnd : String): Unit ={
      LOG.info("计算最终结果-----------------------------结束")
      //计算统计结果的sql
      val sqlContext = ScalaConn.spark
      val sql = sqlContext.sql(
        " select product ," +
          " sum(IF(trade_direction=0 AND trade_type=0,1,0)) as openedBuy," +
          " sum(IF(trade_direction=1 AND trade_type=0,1,0)) as openedSell," +
          " sum(IF(trade_direction=0 AND trade_type=1,1,0)) as closedBuy," +
          " sum(IF(trade_direction=1 AND trade_type=1,1,0)) as closedSell" +
          " from market_analysis stm where open_time >= '" + loopStartTime + "' and open_time < '" + loopEnd + "' group by product")

      //查询
      var query_ps = conn.prepareStatement("select count(1) from hlz_m5 where product= and theTime=")
      var insert_ps = conn.prepareStatement("insert into hlz_m5 (product,theTime,openBuy,openSell,closeBuy,closeSell,rOpenBuy,rOpenSell,rCloseBuy,rCloseSell) VALUES (,,,,,,,,,)")
      var update_ps = conn.prepareStatement("update hlz_m5 set openBuy=,openSell=,closeBuy=,closeSell=,rOpenBuy=,rOpenSell=,rCloseBuy=,rCloseSell= where product= and theTime=")
      sql.collect().foreach(data => {
        val product = data.get(0).toString
        val openBuy = Integer.parseInt(data.get(1).toString)
        val openSell = Integer.parseInt(data.get(2).toString)
        val closeBuy = Integer.parseInt(data.get(3).toString)
        val closeSell = Integer.parseInt(data.get(4).toString)


        //      //先插入一条数据,然后根据LAST_INSERT_ID()获取最新ID,然后再修改rowkey
        //      var rowkey = new StringBuffer(DateFormatUtils.format(new Date, "yyyyMMdd")).reverse().toString + DateFormatUtils.format(new Date, "HHmm") + product
        //      //#############################新增统计结果到hbase################################
        //      val p = new Put(Bytes.toBytes(rowkey))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("product"), Bytes.toBytes(product))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("theTime"), Bytes.toBytes(loopStartTime))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("openBuy"), Bytes.toBytes(openBuy.toString))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("openSell"), Bytes.toBytes(openSell.toString))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("closeBuy"), Bytes.toBytes(closeBuy.toString))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("closeSell"), Bytes.toBytes(closeSell.toString))
        //
        //      //####################################此处的数据最后给用户看的#######################################
        //      //在原来真实数据上加五十再乘以3
        var tmp_openBuy = (openBuy + 50) * 3
        var tmp_openSell = (openSell + 50) * 3
        var tmp_closeBuy = (closeBuy + 50) * 3
        var tmp_closeSell = (closeSell + 50) * 3

        var random = new Random().nextInt(99)
        var rOpenBuy = tmp_openBuy + random
        var rOpenSell = tmp_openSell + random
        var rCloseBuy = tmp_closeBuy + random
        var rCloseSell = tmp_closeSell + random
        //计算随机数
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenBuy"), Bytes.toBytes(rOpenBuy.toString))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenSell"), Bytes.toBytes(rOpenSell.toString))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseBuy"), Bytes.toBytes(rCloseBuy.toString))
        //      p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseSell"), Bytes.toBytes(rCloseSell.toString))
        //      hTable.put(p)


        //************************统计数据存入mysql*********************************
        query_ps.setString(1,data.get(0).toString)
        query_ps.setString(2,loopStartTime)
        var rs = query_ps.executeQuery()
        var i = 0
        while (rs.next()) {
          i = rs.getInt(1);
        }


        if (i > 0) {
          LOG.info("修改")
          update_ps.setInt(1, openBuy)
          update_ps.setInt(2, openSell)
          update_ps.setInt(3, closeBuy)
          update_ps.setInt(4, closeSell)
          update_ps.setInt(5,rOpenBuy)
          update_ps.setInt(6,rOpenSell)
          update_ps.setInt(7,rCloseBuy)
          update_ps.setInt(8,rCloseSell)
          update_ps.setString(9, product)
          update_ps.setString(10, loopStartTime)
          update_ps.executeUpdate()
        } else {
          LOG.info("新增")
          insert_ps.setString(1, product)
          insert_ps.setString(2, loopStartTime)
          insert_ps.setInt(3, openBuy)
          insert_ps.setInt(4, openSell)
          insert_ps.setInt(5, closeBuy)
          insert_ps.setInt(6, closeSell)
          insert_ps.setInt(7,rOpenBuy)
          insert_ps.setInt(8,rOpenSell)
          insert_ps.setInt(9,rCloseBuy)
          insert_ps.setInt(10,rCloseSell)
          insert_ps.executeUpdate()
        }

      })
      sql.foreachPartition{
        x => {

          x.foreach{
            data => {
              val product = data.get(0).toString
              val openBuy = Integer.parseInt(data.get(1).toString)
              val openSell = Integer.parseInt(data.get(2).toString)
              val closeBuy = Integer.parseInt(data.get(3).toString)
              val closeSell = Integer.parseInt(data.get(4).toString)


              //先插入一条数据,然后根据LAST_INSERT_ID()获取最新ID,然后再修改rowkey
              var rowkey = new StringBuffer(DateFormatUtils.format(new Date, "yyyyMMdd")).reverse().toString + DateFormatUtils.format(new Date, "HHmm") + product
              //#############################新增统计结果到hbase################################
              val p = new Put(Bytes.toBytes(rowkey))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("product"), Bytes.toBytes(product))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("theTime"), Bytes.toBytes(loopStartTime))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("openBuy"), Bytes.toBytes(openBuy.toString))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("openSell"), Bytes.toBytes(openSell.toString))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("closeBuy"), Bytes.toBytes(closeBuy.toString))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("closeSell"), Bytes.toBytes(closeSell.toString))

              //####################################此处的数据最后给用户看的#######################################
              //在原来真实数据上加五十再乘以3
              var tmp_openBuy = (openBuy + 50) * 3
              var tmp_openSell = (openSell + 50) * 3
              var tmp_closeBuy = (closeBuy + 50) * 3
              var tmp_closeSell = (closeSell + 50) * 3

              var random = new Random().nextInt(99)
              var rOpenBuy = tmp_openBuy + random
              var rOpenSell = tmp_openSell + random
              var rCloseBuy = tmp_closeBuy + random
              var rCloseSell = tmp_closeSell + random
              //计算随机数
              p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenBuy"), Bytes.toBytes(rOpenBuy.toString))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("rOpenSell"), Bytes.toBytes(rOpenSell.toString))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseBuy"), Bytes.toBytes(rCloseBuy.toString))
              p.add(Bytes.toBytes("info"), Bytes.toBytes("rCloseSell"), Bytes.toBytes(rCloseSell.toString))
              hTable.put(p)
            }
          }
        }
      }
      //新增统计结果到hbase
//      hTable.flushCommits() //关键点3   提交

      //关闭连接
      query_ps.close()
      insert_ps.close()
      update_ps.close()
      LOG.info("计算最终结果-----------------------------结束")
    }

    /**
     * 循环计算最终结果
     * @param time_sql
     * @param conn     *
     * @param hTable
     */
    def countTime(time_sql : DataFrame,conn:  java.sql.Connection,hTable : org.apache.hadoop.hbase.client.Table): Unit ={
      val sdf = new SimpleDateFormat("yyyy-MM-dd H:mm:ss")
      val calendar = Calendar.getInstance();

      time_sql.collect().foreach(data => {
        val startDate: Date = sdf.parse(data(0).toString)
        val endDate: Date = sdf.parse(data(1).toString)
        calendar.setTime(startDate);

        var startYmd: String =  DateFormatUtils.format(startDate, "yyyy-MM-dd")
        val startHour: Int = calendar.get(Calendar.HOUR_OF_DAY)
        val startMinute: Int = calendar.get(Calendar.MINUTE)

        calendar.setTime(endDate);
        var endYmd: String =  DateFormatUtils.format(endDate, "yyyy-MM-dd")
        val endHour: Int = calendar.get(Calendar.HOUR_OF_DAY)
        val endMinute: Int = calendar.get(Calendar.MINUTE)

        LOG.info("startHour:" + startHour + ",startMinute:" + startMinute + ",endHour:" + endHour + ",endMinute:" + endMinute)


        var loopStartTime: String = ""
        var loopEndTime: String = ""
        var tmpEndYmd = startYmd
        //小时相同
        if (endHour == startHour) {  //2016-07-02 23:58:00  2016-07-03 00:10:00  start 2016-07-02 23:30:00 2016-07-02 24:00:00 2016-07-03 00:00:00 2016-07-03 00:30:00
          //计算开始时间
          loopStartTime = countStartTime(startMinute,startYmd,startHour)
          //如果开始时间和结束时间不同天
          if(!startYmd.equals(endYmd)){
            tmpEndYmd = endYmd
          }
          //计算结束时间
          loopEndTime = countEndTime(endMinute,tmpEndYmd,startHour)
        }else {
          //计算开始时间
          loopStartTime = countStartTime(startMinute,startYmd,startHour)
          //如果开始时间和结束时间不同天
          if(!startYmd.equals(endYmd)){
            tmpEndYmd = endYmd
          }
          //计算结束时间
          loopEndTime = countEndTime(endMinute,tmpEndYmd,endHour)
        }
        LOG.info("loopStartTime:-----------" + loopStartTime + ",loopEndTime:-------------" + loopEndTime)


        var loopStart: Long = DateUtil.parseDate(loopStartTime).getTime

        val loopFlag: Long = DateUtil.parseDate(loopEndTime).getTime

        while (loopStart < loopFlag) {
          val loopEnd: String = DateUtil.parseString(new Date(loopStart + dimension * 60 * 1000))
          LOG.info("loopStartTime:" + loopStartTime + "------------,loopEndTime:" + loopEnd)

          //查询
          countResult(conn,hTable, loopStartTime, loopEnd)

          loopStart = loopStart + dimension * 60 * 1000
          loopStartTime = DateUtil.parseString(new Date(loopStart))
        }
      })

    }


    def countStartTime(minute : Int,ymd : String,hour : Int): String ={
      var loopTime : String = ""
      if (minute < dimension) {
        loopTime = ymd + " " + hour + ":00:00"
      } else if(minute > 5 && minute < 10){
        loopTime = ymd + " " + hour + ":05:00"
      } else if(minute > 10 && minute < 15){
        loopTime = ymd + " " + hour + ":10:00"
      } else if(minute >15 && minute < 20){
        loopTime = ymd + " " + hour + ":15:00"
      } else if(minute >20 && minute < 25){
        loopTime = ymd + " " + hour + ":20:00"
      } else if(minute >25 && minute < 30){
        loopTime = ymd + " " + hour + ":25:00"
      } else if(minute > 30 && minute < 35){
        loopTime = ymd + " " + hour + ":30:00"
      } else if(minute > 35 && minute < 40){
        loopTime = ymd + " " + hour + ":35:00"
      } else if(minute > 40 && minute < 45){
        loopTime = ymd + " " + hour + ":40:00"
      } else if(minute > 45 && minute < 50){
        loopTime = ymd + " " + hour + ":45:00"
      } else if(minute > 50 && minute < 55){
        loopTime = ymd + " " + hour + ":50:00"
      } else{
        loopTime = ymd + " " + hour + ":55:00"
      }
      loopTime
    }

    def countEndTime(minute : Int,ymd : String,hour : Int): String ={
      var loopTime : String = ""
      if (minute < dimension) {
        loopTime = ymd + " " + hour + ":05:00"
      } else if(minute > 5 && minute < 10){
        loopTime = ymd + " " + hour + ":10:00"
      } else if(minute > 10 && minute < 15){
        loopTime = ymd + " " + hour + ":15:00"
      } else if(minute >15 && minute < 20){
        loopTime = ymd + " " + hour + ":20:00"
      } else if(minute >20 && minute < 25){
        loopTime = ymd + " " + hour + ":25:00"
      } else if(minute >25 && minute < 30){
        loopTime = ymd + " " + hour + ":30:00"
      } else if(minute > 30 && minute < 35){
        loopTime = ymd + " " + hour + ":35:00"
      } else if(minute > 35 && minute < 40){
        loopTime = ymd + " " + hour + ":40:00"
      } else if(minute > 40 && minute < 45){
        loopTime = ymd + " " + hour + ":45:00"
      } else if(minute > 45 && minute < 50){
        loopTime = ymd + " " + hour + ":50:00"
      } else if(minute > 50 && minute < 55){
        loopTime = ymd + " " + hour + ":55:00"
      } else{
        loopTime = ymd + " " + hour + ":00:00"
        val ca: Calendar = Calendar.getInstance
        ca.setTime(DateUtil.parseDate(loopTime))
        ca.add(Calendar.HOUR_OF_DAY, 1)
        loopTime = DateUtil.parseString(ca.getTime)
      }
      loopTime
    }
  }

  /*
  object SQLContextSingleton {
    @transient  private var instance: SQLContext = _
    def getInstance(sparkContext: SparkContext): SQLContext = {
      if (instance == null) {
        instance = new SQLContext(sparkContext)
      }
      instance
    }
  }*/


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka's Metadata In ZooKeep.. 下一篇消息中间件Kafka Win10安装配置

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }