设为首页 加入收藏

TOP

mapping-USER-to-MOBILE2CMPAYID-20180109
2019-03-07 13:40:49 】 浏览:70
Tags:mapping-USER-to-MOBILE2CMPAYID-20180109
1,在hbase shell中创建HBase表MOBILE2CMPAYID
(请参考"hbase建表流程说明.pdf"文件)
1.1,在任意目录下输入下面2条Linux命令,进入hbase shell
cd /root
hbase shell
1.2,进入hbase shell之后,
如果是第一次创建,直接输入下面1条hbase命令即可
hbase(main):001:0> create 'MOBILE2CMPAYID','cf'
如果不是第一次创建,则首先判断hbase表MOBILE2CMPAYID是否存在
hbase(main):002:0> exists 'MOBILE2CMPAYID'
如果存在,则删除,重建,
hbase(main):002:0> disable 'MOBILE2CMPAYID'
hbase(main):003:0> drop 'MOBILE2CMPAYID'
hbase(main):004:0> create 'MOBILE2CMPAYID','cf'
如果不存在,则直接重建
hbase(main):005:0> create 'MOBILE2CMPAYID','cf'

2,在spark shell中运行以下脚本
2.1, 在任意目录下输入下面2条Linux命令,进入spark shell
cd /root
spark-shell
2.2,进入spark shell之后,粘贴下面全部代码到 scala>  中即可,等待执行完成.

def readHbase(sc:org.apache.spark.SparkContext,readTableName:String="USER") ={
val hbaseConf = org.apache.hadoop.hbase.HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","s1sl11,s1ma11,s1sl22")
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,readTableName)
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf,
classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
import org.apache.hadoop.hbase.util.Bytes
val m2cRDD=hbaseRDD.map(
r=>{
val mobile_no:String=Bytes.toString(r._2.getValue(Bytes.toBytes("cf"),Bytes.toBytes("mobile_no")))
val user_id:String=Bytes.toString(r._2.getValue(Bytes.toBytes("cf"),Bytes.toBytes("user_id")))
val is_validated:String=Bytes.toString(r._2.getValue(Bytes.toBytes("cf"),Bytes.toBytes("is_validated")))
if(is_validated == "true" && mobile_no != null && user_id != null ){
(mobile_no,user_id,is_validated)
}else{
("-1","-1","-1")
}
}
)
m2cRDD
}
def putWriteHbase(sc:org.apache.spark.SparkContext,m2cRDD:org.apache.spark.rdd.RDD[(String, String,String)],writeTableName:String="MOBILE2CMPAYID")={
m2cRDD.foreachPartition(
iter => {
val hbaseConf = org.apache.hadoop.hbase.HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","s1sl11,s1ma11,s1sl22")
val hbaseTable = new org.apache.hadoop.hbase.client.HTable(hbaseConf,writeTableName)
iter.foreach(
row => {
import org.apache.hadoop.hbase.util.Bytes
val mobile_no:String=row._1
val user_id:String=row._2
val is_validated:String=row._3
if(mobile_no != "-1"){
val put = new org.apache.hadoop.hbase.client.Put(Bytes.toBytes(mobile_no))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("mobile_no"),Bytes.toBytes(mobile_no))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("user_id"),Bytes.toBytes(user_id))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("is_validated"),Bytes.toBytes(is_validated))
hbaseTable.put(put)
}
}
)
hbaseTable.close()
}
)
}
val m2cRDD=readHbase(sc,"USER")
putWriteHbase(sc,m2cRDD,"MOBILE2CMPAYID")
println("ok, Finished")

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase目录 下一篇Hbase   snapshot

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目