设为首页 加入收藏

TOP

spark与hbase进行交互
2018-11-29 02:48:21 】 浏览:29
Tags:spark hbase 进行 交互

软件环境:

spark-1.2.1

hadoop-2.5.2

hbase-1.0.0


1. 搭好上述开发环境

2. 在IntelliJ IDEA中创建scala项目,从hbase-1.0.0/lib/目录中导入如下包:hbase-client-1.0.0.jar, hbase-common-1.0.0.jar, hbase-server-1.0.0.jar, hbase-protocol-1.0.0.jar, guava-12.0.1.jar, htrace-core-3.1.0-incubating.jar

(注:hbase-*的jar包是编写程序所需要;guava包如果不导入,运行时会出现找不到com/google/common/collect/ListMultimap的错误;htrace包如果不导入,运行时会出现找不到org/apache/htrace/Trace的错误)

3. 编写程序

/**
 * Created by tifctu on 3/3/15.
 */
package com.founder.big.spark

import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.{Put, HBaseAdmin, Scan, Result}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.util.Base64

class HBaseConnector(sc: SparkContext, table: String) extends Serializable {

  val sqlContext = new SQLContext(sc)

  val conf = HBaseConfiguration.create()
  conf.set("hbase.master", "cloud00:16030")
  conf.addResource("/cloud/hbase-1.0.0/conf/hbase-site.xml")
  conf.set(TableInputFormat.INPUT_TABLE, table)

  //val scan = new Scan()
  //scan.setStartRow(Bytes.toBytes("1988"))
  //scan.setStopRow(Bytes.toBytes("2015"))
  //scan.addFamily(Bytes.toBytes("cf"))
  //scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("did"))
  //val proto = ProtobufUtil.toScan(scan)
  //val scanToString = Base64.encodeBytes(proto.toByteArray)
  //conf.set(TableInputFormat.SCAN, scanToString)

  val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf[org.apache.hadoop.hbase.client.Result])
  println(hBaseRDD.count())

  /*
  val rowkey = "tifctu"
  val put: Put = new Put(rowkey)
  put.add(Bytes.toBytes("cf"), Bytes.toBytes("did"), Bytes.toBytes("cf"), Bytes.toBytes("value"))
  conf.write(new ImmutableBytesWritable(rowkey), put)
  */
}

object HBaseConnector {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseTest")
    conf.setMaster("spark://cloud06:7077")
    val sc = new SparkContext(conf)
    new HBaseConnector(sc, "mytable")
  }
}

4. 创建应用jar包,在IntelliJ IDEA的Build -> Build Artifacts -> Edit中,将上述所导入的jar包以Extracted Directory的方式添加到Output Layout中,将源程序及第三方jar包一起打包。

如果不将第三方jar包打到应用中,可以在spark-submit运行应用时以 --jars 的方式导入,格式如下:--jars ./lib/hbase-client-1.0.0.jar,./lib/hbase-common-1.0.0.jar,./lib/hbase-server-1.0.0.jar,./lib/hbase-protocol-1.0.0.jar,./lib/guava-12.0.1.jar,./lib/htrace-core-3.1.0-incubating.jar

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase随笔记录权限控制、命名空间.. 下一篇Hbase数据的不同版本号

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目