设为首页 加入收藏

TOP

CDH5.8手动安装spark2.1的运行错误整合spark访问Hbase数据导入mysql详解(一)
2018-01-22 17:25:03 】 浏览:693
Tags:CDH5.8 手动 安装 spark2.1 运行 错误 整合 spark 访问 Hbase 数据 导入 mysql 详解

在CDH5.8上面安装的时候spark1.6 苦于编程打包时的问题(spark1.6使用的是scala2.10 idea上使用2.10进行编译的时候 有时会报错 但是用2.11能编译通过 不过2.11编译的在spark上面运行时就会出现找不到包的问题 有人知道怎么回事请通知我 ) 就给CDH手动升级到了2.1版本。

安装完之后我是保留了 1.6的 2.1的版本使用spark命令时都改成spark2 spark2-submit之类的

使用spark来读取hbase数据 进行操作

case class newsInfo(id:String,subject:String,descripe:String,source:String,sendTime:String)extends Serializable{
    override  def toString: String="%s\t%s\t%s\t%s".format(id,subject,descripe,source,sendTime)
	//初始化sparksession
    val ss = SparkSession.builder.
      appName("RHBTSQL")
      .getOrCreate()

    val tablename = "tablename"
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "master,node1,node2")
    conf.set("hbase.master", "master")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    val scan = new Scan()
    scan.setCacheBlocks(false)
    
    scan.addFamily(Bytes.toBytes("news"))
    scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("subject"))
    scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("contextSplit"))
    scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("descripe"))
    scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("source"))
    scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("sendTime"))

    //设置值过滤
    val filter = new SingleColumnValueFilter(Bytes.toBytes("news"), Bytes.toBytes("sendTime"), CompareOp.EQUAL, Bytes.toBytes("2017-12-13"))
    scan.setFilter(filter)
    conf.set(TableInputFormat.INPUT_TABLE, tablename)
    //将scan类转化成string类型
    val scan_str= ProtobufUtil.toScan(scan)
    val scan_S=Base64.encodeBytes(scan_str.toByteArray())
    conf.set(TableInputFormat.SCAN,scan_S)
    //使用new hadoop api,读取数据,并转成rdd
    val rdd = ss.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

运行读取hbase数据时会出现缺包 以及其他问题 这里是因为 spark2安装上之后并不是CDH整合的 而是区别于原来版本的spark的 所以 这里要

在/etc/spark/conf/目录下把老的spark的classpath.txt spark-env.sh 复制到/data/cloudera/parcels/SPARK2-2.0.0.cloudera2-1.cdh5.7.0.p0.118100/etc/spark2/conf.dist/下,/etc/spark2/conf是该目录的链接,然后修改spark-env.sh中SPARK_HOME=/data/cloudera/parcels/SPARK2-2.0.0.cloudera2-1.cdh5.7.0.p0.118100/lib/spark2

读取到的hbase数据RDD再进行数据整理 插入到mysql

一开始试用的是

 def myFun(iterator: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "insert into blog(name, count) values (?, ?)"
    try {
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", 
    "root", "123456")
      iterator.foreach(data => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, data._1)
        ps.setInt(2, data._2)
        ps.executeUpdate()
      }
      )
    } catch {
      case e: Exception => println("Mysql Exception")
    } finally {
      if (ps != null) {
        ps.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }
 
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("RDDToMysql").setMaster("local")
    val sc = new SparkContext(conf)
    va
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Mysql组合索引最左侧原理详解 下一篇更改(重置)MySQL用户密码的两种..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目