设为首页 加入收藏

TOP

HBase1.1.2增删改查scala代码实现
2018-12-07 01:45:25 】 浏览:98
Tags:HBase1.1.2 删改 scala 代码 实现
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/sinat_35045195/article/details/76888583

增删改查工具类


package org.apache.hadoop.conf
import java.util

import com.google.protobuf.GeneratedMessage
import org.apache.commons.codec.language.bm.Lang
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.{CompareFilter, Filter, FilterBase}
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.mutable.ArrayBuffer



/**
  * Created by Administrator on 2017/8/3.
  */
class HbaseUtils {
  val comp = new util.HashMap[String,CompareFilter.CompareOp]()
  comp.put("EQUAL",CompareFilter.CompareOp.EQUAL)
  comp.put("GREATER",CompareFilter.CompareOp.GREATER)
  comp.put("GREATER_OR_EQUAL",CompareFilter.CompareOp.GREATER_OR_EQUAL)
  comp.put("LESS",CompareFilter.CompareOp.LESS)
  comp.put("LESS_OR_EQUAL",CompareFilter.CompareOp.LESS_OR_EQUAL)
  comp.put("NO_OP",CompareFilter.CompareOp.NO_OP)
  comp.put("NOT_EQUAL",CompareFilter.CompareOp.NOT_EQUAL)

  def getAdmin(conf:Configuration):HBaseAdmin ={
    val connection = ConnectionFactory.createConnection(conf)
    connection.getAdmin().asInstanceOf[HBaseAdmin]
  }

  /**
    * @see 根据指定的管理员,表名,列族名称创建表
    * @param admin HBaseAdmin 创建的HBaseAdmin对象
    * @param tName String 需要创建的表名
    * @param columnNames List[String] 列族名称的集合
    */
  def createTable(admin: HBaseAdmin,tName:String,columnNames:List[String] ):Boolean = {
    if(admin.tableExists(tName))false
    try
      {val tableDesc = new HTableDescriptor(TableName.valueOf(tName))
      columnNames.foreach(columnName => tableDesc.addFamily(new HColumnDescriptor(columnName)))//添加列
      admin.createTable(tableDesc)
      true}
    catch {case e: Exception => e.printStackTrace();false}
  }

  /**
    * @see 根据表名、rowkey、列族名、列名、值、增加数据、如果增加成功则返回true
    * @param conf HBaseConfiguration
    * @param tableName String 表名
    * @param rowKey String
    * @param columnFamily String 列族名称
    * @param column String列名
    * @param value String 值
    * @return 是否插入成功
    */
  def insertData(conf:Configuration,tableName:String,rowKey:String,columnFamily:String,column:String,value:String):Boolean={
      val  table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName))
      val put = new Put(Bytes.toBytes(rowKey))
      put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value))
//      put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value))
      table.put(put)
    true
  }
  def deleteTable(conf:Configuration,tableName:String):Boolean={
    val admin = getAdmin(conf)
    try{
      if(admin.tableExists(tableName)){
        admin.disableTable(tableName)
        admin.deleteTable(tableName)
      }
    }catch {
      case e:Exception=>e.printStackTrace() ; false
    }
    true
  }

  /**
    * @see 根据指定的配置信息全表扫描指定的表
    * @param conf Configuration 配置信息
    * @param tableName String 表名
    * @return ArrayBuffer[Array[Cell] ]
    */
  def getByScan(conf:Configuration,tableName:String):ArrayBuffer[Array[Cell]]={
    var arrBuffer = ArrayBuffer[Array[Cell]]()

      val scaner = new Scan()
//      val table = new HTable(conf,tableName)
      val  table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName))
      val results = table.getScanner(scaner)
      var res:Result = results.next()
      while (res != null){
        arrBuffer += res.rawCells()
        res = results.next()
      }
    arrBuffer
  }
  def getRow(conf:Configuration,tableName:String,row:String):Array[Cell]={
//    val table = new HTable(conf,tableName)
    val  table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName))
    val get = new Get(Bytes.toBytes(row))
    val res = table.get(get)
    res.rawCells()
  }

  /**
    * @see 删除指定表的指定row数据
    * @param conf Configuration
    * @param tableName String 要删除的表名
    * @param row String 要删除的row名
    * @return  Boolean 是否成功
    */
  def delRow(conf:Configuration,tableName:String,row:String):Unit={
    val  table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName))
    table.delete( new Delete(Bytes.toBytes(row)))

  }

  /**
    * @deprecated
    * @see 更新某个指定的数据
    * @param conf Configuration
    * @param tableName String 要更新的表名
    * @param row String 需要更新的row
    * @param family String 需要更新的列族
    * @param qualifier String 需要修改的列
    * @param compareon String   只能为,不分大小写,EQUAL、GREATER、GREATER_OR_EQUAL、LESS、LESS_OR_EQUAL、NO_OP、NOT_EQUAL
    * @param value String 需要更改的值
    * @param newvalue String
    * @return
    */
  def updateByDelete(conf:Configuration,tableName:String,row:String,family:String,qualifier:String, compareon:String,value:String,newvalue:String):Boolean ={
    if(comp.get(compareon.toUpperCase)== null)false
      val  table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName))
      val mut = new RowMutations(Bytes.toBytes(row))
      mut.add(new Delete(Bytes.toBytes(row)).addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier)))
      mut.add(new Put(Bytes.toBytes(row)).addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(newvalue)))
      print("-----------" + comp.get(compareon.toUpperCase()))
      table.checkAndMutate(Bytes.toBytes(row),Bytes.toBytes(family),Bytes.toBytes(qualifier),comp.get(compareon.toUpperCase()) ,Bytes.toBytes(value),mut)
  }
  def updateByCover(conf:Configuration,tableName:String,row:String,family:String,qualifier:String, compareon:String,value:String,newvalue:String):Boolean = {
    if(comp.get(compareon.toUpperCase) == null)false
      val table =ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName))
      val mut = new RowMutations(Bytes.toBytes(row))
      mut.add(new Put(Bytes.toBytes(row)).addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(newvalue)))
      table.checkAndMutate(Bytes.toBytes(row),Bytes.toBytes(family),Bytes.toBytes(qualifier),comp.get(compareon.toUpperCase()) ,Bytes.toBytes(value),mut)
  }
  def findByOldTime(conf:Configuration,tableName:String,row:String,cFamily:String,qualifier:String, timestamp: Long): Result ={
    val table = ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName))
    val get = new Get(Bytes.toBytes(row))
    get.setTimeStamp(timestamp)
    get.addColumn(Bytes.toBytes(cFamily),Bytes.toBytes(qualifier))
    table.get(get)
  }

}

测试用例

package Hbase

import org.apache.hadoop.conf.HbaseUtils
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{ConnectionFactory, HBaseAdmin}
import org.apache.hadoop.hbase.executor.ExecutorService
import org.apache.hadoop.hbase.protobuf.generated.CellProtos.KeyValue
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos.{FamilyFilter, SingleColumnValueFilter}
import org.apache.hadoop.hbase.util.Bytes

import scala.actors.threadpool.Executors

/**
  * Created by Administrator on 2017/8/3.
  */
object test {
  def main(args: Array[String]): Unit = {

//    ExecutorService pool = Executors.newFixedThreadPool(10)
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "master")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    val hBaseUtils = new HbaseUtils()
    val admin = hBaseUtils.getAdmin(conf)
//    val list = List("family1","family2")
//    hBaseUtils.createTable(admin,"test2",list)


//    println(hBaseUtils.insertData(conf,"test2","rowkey1","family1","李四","lisi2"))
//    val row = hBaseUtils.getRow(conf,"test2","rowkey1")
//    row.foreach(a=>{print(new String(a.getRow())+" ");print(a.getTimestamp+" ");print(new String(a.getFamily)+" ");print(new String(a.getValue)+" ")})

//    print(hBaseUtils.delRow(conf,"test2","rowkey1"))
    val all = hBaseUtils.getByScan(conf,"test2")
//    all.foreach(a=>a.foreach( a=>{print(new String(a.getRow())+" ");print(a.getTimestamp+" ")
//      ;print(new String(a.getFamily)+" ");print(new String(a.getValue)+" ")}))
    all.foreach(a=>a.foreach( a=>{print(new String(a.getRowArray,a.getRowOffset,a.getRowLength)+"-->row  ");
      print(a.getTimestamp+"-->timpsstamp  ");
      print(new String(a.getFamilyArray,a.getFamilyOffset,a.getFamilyLength)+"-->family  ");
      println(new String(a.getValueArray,a.getValueOffset,a.getValueLength)+"-->value  "+
        new String(a.getQualifierArray,a.getQualifierOffset,a.getQualifierLength)+ " -->Tags")}))
//    print(hBaseUtils.updateByDelete(conf,"test2","rowkey1","family1","李四","equal","lisi2","小王八蛋"))
//    print(hBaseUtils.updateByCover(conf,"test2","rowkey1","family1","李四","equal","小王八蛋","小王八蛋2"))
//    val row = hBaseUtils.findByOldTime(conf,"test2","rowkey1","family2","李四",1501810811243L).getRow
//    print(row)v
 
  }

}

有有的测试方法被我屏蔽了,大家可以去掉注释进行使用



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase优化技巧、存储 下一篇C# 通过Thrift 1 操作 HBase

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目