版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/weinierzui/article/details/71839957
自己尝试实现的类,
本类的作用:调用内部方法
根据输入的表名,列族,列名,筛选需要展示的列
根据输入的列族,列名,列值,筛选条件,是用过滤器过滤数据
返回值:SQLContext
已完成表的注册,可以直接操作sql方法,使用sql语言查询处理
代码如下
package cn.deppon.Tool
import java.util
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan}
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by DreamBoy on 2017/5/12.
*/
object SparkHbaseTool {
/**
* 利用主构造器构造需要的环境的基本条件
*/
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
//设置spark参数
val conf = new SparkConf().setMaster("local[2]")
.setAppName("HbaseTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create()
val sqlContext = new SQLContext(sc)
//配置HBase
hbaseConf.set("hbase.rootdir", "hdfs://http://192.168.10.228/hbase")
hbaseConf.set("hbase.zookeeper.quorum", "192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set("hbase.master", "192.168.10.230")
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
/**
*
* @param tbl_nm 表名
* @param show_col _1 列族 _2列名 _3 列类型(String,Int,Double,Timestamp...)
* @param filter_col _1 列族 _2列名 _3 筛选值 _4 筛选类型(=,<,>,!=...)
* @return sqlcontext
*/
def getTableNm(tbl_nm: String, show_col: Array[(String, String, String)], filter_col: Array[(String, String, String, String)]): (SQLContext) = {
hbaseConf.set(TableInputFormat.INPUT_TABLE, tbl_nm)
val table = new HTable(hbaseConf, tbl_nm)
val scan = new Scan()
/**
* 指定列族和需要显示的列名
* 添加多个需要用到的列
*/
/*
val length = show_col.length
for(i <- show_col){
scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2))
}
*/
//设置rowkey的范围,启示和结束
//scan.setStartRow(Bytes.toBytes(""))
//scan.setStopRow(Bytes.toBytes(""))
val fil_len = filter_col.length
println("------>>>>" + fil_len)
//如果没有添加过滤器,就给过滤器添加空
if (fil_len > 0) {
val filter_arr = new util.ArrayList[Filter](fil_len)
for (i <- filter_col) {
i._4 match {
case "=" => {
val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
Bytes.toBytes(i._2), CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
filter1.setFilterIfMissing(true)
filter_arr.add(filter1)
}
case "<" => {
val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes(i._3)))
filter1.setFilterIfMissing(true)
filter_arr.add(filter1)
}
case "<=" => {
val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
filter1.setFilterIfMissing(true)
filter_arr.add(filter1)
}
case ">" => {
val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(i._3)))
filter1.setFilterIfMissing(true)
filter_arr.add(filter1)
}
case ">=" => {
val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
//filter1.setFilterIfMissing(true)
filter_arr.add(filter1)
}
case "!=" => {
val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
Bytes.toBytes(i._2), CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
filter1.setFilterIfMissing(true)
filter_arr.add(filter1)
}
case _ => {}
}
}
/**
* 通过使用filterlist可以加载多个过滤器
* 设置多个过滤器
*/
val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_arr)
scan.setFilter(filterList)
} else {
scan.setFilter(null)
}
//hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
//获取表的扫描
val ColumnValueScanner = table.getScanner(scan)
//构建structtype需要的list 根据传入的类型参数构建表
/*var list_col = show_col.map{x=>{
/* x._3 match {
case "String" => StructField(x._2,StringType,true)
case "Int" => StructField(x._2,StringType,true)
case "Double" => StructField(x._2,StringType,true)
case "Timestamp" => StructField(x._2,StringType,true)
case _ => StructField(x._2,StringType,true)
}*/
StructField(x._2,StringType,true)
}
}*/
/**
* structType构造的目的:为在后面产生dataframe的时候指定每个值的列名
* 在注册成表的时候可以使用
*/
var list_col: List[StructField] = List()
list_col :+= StructField("id", StringType, true)
for (i <- show_col) {
list_col :+= StructField(i._2, StringType, true)
}
//构建表的structType
val schema = StructType(list_col)
val tbl_rdd = ColumnValueScanner.iterator().asScala
//把过滤器加载到hbaseconf中
hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
//构建RDD
val hbaseRDD = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
//构建rdd的结果集
val rowRDD = hbaseRDD.map { case (_, result) => {
var valueSeq: Seq[String] = Seq()
//获取行键
val key = Bytes.toString(result.getRow)
//通过列族和列名获取列 不加rowkey方法
// for(column <- columns) {
// valueSeq :+= Bytes.toString(result.getValue(family.getBytes, column.getBytes))
// }
//加rowkey方法,Array第一列必须是"rowkey"
valueSeq :+= key
for (row <- show_col) {
valueSeq :+= Bytes.toString(result.getValue(row._1.getBytes, row._2.getBytes))
}
Row.fromSeq(valueSeq)
}
}
val hbasedataframe = sqlContext.createDataFrame(rowRDD, schema)
hbasedataframe.registerTempTable(tbl_nm)
sqlContext
}
}