TOP

HBaseRDD -- 封装Spark on HBase操作到Spark RDD
2019-03-06 13:48:55 】 浏览:699
Tags:HBaseRDD 封装 Spark HBase 作到 RDD

版权声明:原创作品,如需转载,请注明出处。否则将追究法律责任 https://blog.csdn.net/chenjian_grt/article/details/80347273

最近阅读Spark的源代码,发现Spark使用隐式转换,对rdd进行扩展,提供额外的功能,如PairRDDFunctions,对RDD进行扩展,提供诸如orderByKey等方法,前段时间我们使用Spark操作HBase,由于急着上线,未能对功能进行较好的封装,现在回过头去看,发现其实可以模仿PairRDDFunctions的实现,做一个HBaseRDDFuctions,通过隐式转换,实现对HBase基本操作的封装。

HBaseRDDFuctions的代码实现如下:

import java.io.IOException

import java.util

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.KeyValue.Type

import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableInputFormat}

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.util.{Base64, Bytes}

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import scala.language.implicitConversions

import scala.collection.mutable.ListBuffer

/**

* Created by chenjian on 2018/4/26.

*/

class HBaseRDDFunctions[T](self: RDD[T]) {

private def saveToHBase(sc: SparkContext,

tableName: String,

hbaseRDD: RDD[(ImmutableBytesWritable, KeyValue)],

tmpHFilePath: String

): Unit = {

var conn: Connection = null

var realTable: Table = null

val hbaseConf = HBaseConfiguration.create(sc.hadoopConfiguration)

val hbTableName = TableName.valueOf(tableName)

val job = Job.getInstance(hbaseConf)

val fs = FileSystem.get(sc.hadoopConfiguration)

fs.deleteOnExit(new Path(tmpHFilePath))

try {

conn = ConnectionFactory.createConnection(hbaseConf)

job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

job.setMapOutputValueClass(classOf[KeyValue])

realTable = conn.getTable(hbTableName)

HFileOutputFormat2.configureIncrementalLoad(job, realTable, conn.getRegionLocator(hbTableName))

hbaseRDD.saveAsNewAPIHadoopFile(tmpHFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)

val loader = new LoadIncrementalHFiles(hbaseConf)

loader.doBulkLoad(new Path(tmpHFilePath), realTable.asInstanceOf[HTable])

} finally {

fs.deleteOnExit(new Path(tmpHFilePath))

if (realTable != null) {

realTable.close()

}

if (conn != null) {

conn.close()

}

}

}

def hbaseBulkDeleteColumn(sc: SparkContext,

tableName: String,

tmpHFilePath: String,

transfer: T => Array[Byte]

): Unit = {

implicit val rowKeyOrding = new Ordering[Array[Byte]] {

override def compare(left: Array[Byte], right: Array[Byte]) = {

Bytes.compareTo(left, right)

}

}

val ts = System.currentTimeMillis()

val hbaseRdd = self.map(rd => {

(transfer(rd), Type.DeleteColumn.getCode)

}).sortByKey().map(rd => {

(new ImmutableBytesWritable(rd._1), new KeyValue(rd._1, ts, Type.DeleteColumn))

})

saveToHBase(sc, tableName, hbaseRdd, tmpHFilePath)

}

def hbaseBulkLoad(sc: SparkContext,

tableName: String,

tmpHFilePath: String,

transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])],

kvType: Type = Type.Put

): Unit = {

implicit val rowKeyOrding = new Ordering[Array[Byte]] {

override def compare(left: Array[Byte], right: Array[Byte]) = {

Bytes.compareTo(left, right)

}

}

val ts = System.currentTimeMillis()

val hbaseRDD = self.flatMap(transfer).map(rd => {

val rowKey = rd._1

val columnFamily = rd._2

val qualifier = rd._3

val value = rd._4

val key = new Array[Byte](rowKey.length + columnFamily.length + qualifier.length)

System.arraycopy(rowKey, 0, key, 0, rowKey.length)

System.arraycopy(columnFamily, 0, key, rowKey.length, columnFamily.length)

System.arraycopy(qualifier, 0, key, rowKey.length + columnFamily.length, qualifier.length)

(key, (rowKey.length, columnFamily.length, value))

}).sortByKey().map(rd => {

val rowKey = new Array[Byte](rd._2._1)

val columnFamily = new Array[Byte](rd._2._2)

val qualifier = new Array[Byte](rd._1.length - rd._2._1 - rd._2._2)

System.arraycopy(rd._1, 0, rowKey, 0, rd._2._1)

System.arraycopy(rd._1, rd._2._1, columnFamily, 0, rd._2._2)

System.arraycopy(rd._1, rd._2._1 + rd._2._2, qualifier, 0, rd._1.length - rd._2._1 - rd._2._2)

val kv = new KeyValue(rowKey, columnFamily, qualifier, ts, kvType, rd._2._3)

(new ImmutableBytesWritable(rowKey), kv)

})

saveToHBase(sc, tableName, hbaseRDD, tmpHFilePath)

}

def hbaseBulkDelete(sc: SparkContext,

tableName: String,

tmpHFilePath: String,

transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])]

): Unit = {

hbaseBulkLoad(sc, tableName, tmpHFilePath, transfer, Type.DeleteColumn)

}

def hbaseGet(sc: SparkContext,

tableName: String,

transfer: T => Get,

zkQuorum: String,

zkPort: String,

num: Int = 100

): RDD[Result] = {

self.map(transfer).mapPartitions(it => {

val lb = new ListBuffer[Result]()

val batch = new util.ArrayList[Row]

var realTable: Table = null

val hbTableName = TableName.valueOf(tableName)

var conn: Connection = null

val hbConf = HBaseConfiguration.create()

hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

hbConf.set("hbase.zookeeper.quorum", zkQuorum)

try {

conn = ConnectionFactory.createConnection(hbConf)

realTable = conn.getTable(hbTableName)

while (it.hasNext) {

val get = it.next()

get.setCacheBlocks(false)

batch.add(get)

if (batch.size >= num) {

val results = new Array[Object](batch.size)

realTable.batch(batch, results)

results.foreach(x => {

lb += x.asInstanceOf[Result]

})

}

}

if (batch.size() != 0) {

val results = new Array[Object](batch.size)

realTable.batch(batch, results)

results.foreach(x => {

lb += x.asInstanceOf[Result]

})

}

} finally {

if (realTable != null) {

try {

//关闭HTable对象

realTable.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

if (conn != null) {

try {

// 关闭HBase连接

conn.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

}

lb.iterator

})

}

def hbaseDelete(sc: SparkContext,

tableName: String,

transfer: T => Delete,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

}

def hbaseMutation(sc: SparkContext,

tableName: String,

transfer: T => Mutation,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

self.foreachPartition(it => {

var conn: Connection = null

var realTable: Table = null

val hbConf = HBaseConfiguration.create()

hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

hbConf.set("hbase.zookeeper.quorum", zkQuorum)

val hbTableName = TableName.valueOf(tableName)

try {

conn = ConnectionFactory.createConnection(hbConf)

realTable = conn.getTable(hbTableName)

val mutationList = new java.util.ArrayList[Mutation]

while (it.hasNext) {

mutationList.add(transfer(it.next()))

if (mutationList.size >= num) {

realTable.batch(mutationList, null)

mutationList.clear()

}

}

if (mutationList.size > 0) {

realTable.batch(mutationList, null)

mutationList.clear()

}

} finally {

if (realTable != null) {

try {

//关闭HTable对象

realTable.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

if (conn != null) {

try {

// 关闭HBase连接

conn.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

}

})

}

def hbaseScan(sc: SparkContext,

tableName: String,

transfer: T => Scan,

zkQuorum: String,

zkPort: String

): RDD[Result] = {

self.mapPartitions(it => {

val lb = new ListBuffer[Result]()

var conn: Connection = null

var realTable: Table = null

val hbConf = HBaseConfiguration.create()

hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

hbConf.set("hbase.zookeeper.quorum", zkQuorum)

val hbTableName = TableName.valueOf(tableName)

try {

conn = ConnectionFactory.createConnection(hbConf)

realTable = conn.getTable(hbTableName)

var scanner: ResultScanner = null

try {

while (it.hasNext) {

val scan = transfer(it.next())

scan.setCacheBlocks(false)

scanner = realTable.getScanner(scan)

val scannerIt = scanner.iterator

while (scannerIt.hasNext) {

val rs = scannerIt.next()

lb += rs

}

}

} finally {

if (scanner != null) {

scanner.close()

}

}

} finally {

if (realTable != null) {

try {

//关闭HTable对象

realTable.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

if (conn != null) {

try {

// 关闭HBase连接

conn.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

}

lb.iterator

})

}

def hbasePut(sc: SparkContext,

tableName: String,

transfer: T => Put,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

}

def hbaseIncrement(sc: SparkContext,

tableName: String,

transfer: T => Increment,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

}

}

object HBaseRDDFunctions {

def fromScan(sc: SparkContext,

tableName: String,

scan: Scan

): RDD[(ImmutableBytesWritable, Result)] = {

scan.setCacheBlocks(false)

val proto = ProtobufUtil.toScan(scan)

val scanToString = Base64.encodeBytes(proto.toByteArray)

val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)

hbConf.set(TableInputFormat.INPUT_TABLE, tableName)

hbConf.set(TableInputFormat.SCAN, scanToString)

sc.newAPIHadoopRDD(hbConf,

classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result]

)

}

implicit def rddToHBaseRDD[T](rdd: RDD[T]): HBaseRDDFunctions[T] = {

new HBaseRDDFunctions(rdd)

}

}

注:HBase的版本为1.0.2, Spark的版本为2.1.0

使用方法:

1、导入隐式转换函数:

2、导入后,spark rdd将得到扩展,出现我们自己开发的Spark on HBase的方法:


HBaseRDD -- 封装Spark on HBase操作到Spark RDD https://www.cppentry.com/bencandy.php?fid=118&id=211976

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hbase   snapshot 下一篇hbase的JavaAPI操作:连接、创建..