设为首页 加入收藏

TOP

HBase和HDFS数据互导程序
2019-01-23 13:47:42 】 浏览:62
Tags:HBase HDFS 数据 程序
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/jethai/article/details/52345071




下面说说JAVA API 提供的这些类的功能和他们之间有什么样的联系。


1.HBaseConfiguration

关系:org.apache.hadoop.hbase.HBaseConfiguration

作用:通过此类可以对HBase进行配置

用法实例: Configuration config = HBaseConfiguration.create();

说明: HBaseConfiguration.create() 默认会从classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration

2.HBaseAdmin 类

关系:org.apache.hadoop.hbase.client.HBaseAdmin

作用:提供接口关系HBase 数据库中的表信息

用法:HBaseAdmin admin = new HBaseAdmin(config);

3.Descriptor类

关系:org.apache.hadoop.hbase.HTableDescriptor

作用:HTableDescriptor 类包含了表的名字以及表的列族信息

用法:HTableDescriptor htd =new HTableDescriptor(tablename);

构造一个表描述符指定TableName对象。

Htd.addFamily(new HColumnDescriptor(“myFamily”));

将列家族给定的描述符

4.HTable

关系:org.apache.hadoop.hbase.client.HTable

作用:HTable HBase 的表通信

用法:HTable tab = new HTable(config,Bytes.toBytes(tablename));

ResultScanner sc = tab.getScanner(Bytes.toBytes(“familyName”));

说明:获取表内列族 familyNme 的所有数据。

5.Put

关系:org.apache.hadoop.hbase.client.Put

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

Put put = new Put(row);

p.add(family,qualifier,value);

说明:向表 tablename 添加 “family,qualifier,value”指定的值。

6.Get

关系:org.apache.hadoop.hbase.client.Get

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

Get get = new Get(Bytes.toBytes(row));

Result result = table.get(get);

说明:获取 tablename 表中 row 行的对应数据

7.ResultScanner

关系:Interface

作用:获取值的接口

用法:ResultScanner scanner = table.getScanner(Bytes.toBytes(family));

For(Result rowResult : scanner){

Bytes[] str = rowResult.getValue(family,column);

}

说明:循环获取行中列值。



例1 HBase之读取HDFS数据写入HBase

packageorg.hadoop.hbase;
importjava.io.IOException;
importjava.util.StringTokenizer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.HColumnDescriptor;
importorg.apache.hadoop.hbase.HTableDescriptor;
importorg.apache.hadoop.hbase.client.HBaseAdmin;
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
importorg.apache.hadoop.hbase.mapreduce.TableReducer;
importorg.apache.hadoop.hbase.util.Bytes;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;
publicclassWordCountHbaseWriter{
publicstaticclassWordCountHbaseMapperextends
Mapper<Object,Text,Text,IntWritable>{
privatefinalstaticIntWritableone=newIntWritable(1);
privateTextword=newText();
publicvoidmap(Objectkey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
StringTokenizeritr=newStringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);//输出<key,value>为<word,one>
}
}
}
publicstaticclassWordCountHbaseReducerextends
TableReducer<Text,IntWritable,ImmutableBytesWritable>{
publicvoidreduce(Textkey,Iterable<IntWritable>values,
Contextcontext)throwsIOException,InterruptedException{
intsum=0;
for(IntWritableva l:values){//遍历求和
sum+=val.get();
}
Putput=newPut(key.getBytes());//put实例化,每一个词存一行
//列族为content,列修饰符为count,列值为数目
put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(String.valueOf(sum)));
context.write(newImmutableBytesWritable(key.getBytes()),put);//输出求和后的<key,value>
}
}

publicstaticvoidmain(String[]args){
Stringtablename="wordcount";
Configurationconf=HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","192.168.1.139");
conf.set("hbase.zookeeper.property.clientPort","2191");
HBaseAdminadmin=null;
try{
admin=newHBaseAdmin(conf);
if(admin.tableExists(tablename)){
System.out.println("tableexists!recreating.......");
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
HTableDescriptorhtd=newHTableDescriptor(tablename);
HColumnDescriptortcd=newHColumnDescriptor("content");
htd.addFamily(tcd);//创建列族
admin.createTable(htd);//创建表
String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=1){
System.err.println("Usage:WordCountHbaseWriter<in>");
System.exit(2);
}
Jobjob=newJob(conf,"WordCountHbaseWriter");
job.setNumReduceTasks(2);
job.setJarByClass(WordCountHbaseWriter.class);
//使用WordCountHbaseMapper类完成Map过程;
job.setMapperClass(WordCountHbaseMapper.class);
TableMapReduceUtil.initTableReducerJob(tablename,WordCountHbaseReducer.class,job);
//设置任务数据的输入路径;
FileInputFormat.addInputPath(job,newPath(otherArgs[0]));
//设置了Map过程的输出类型,其中设置key的输出类型为Text;
job.setOutputKeyClass(Text.class);
//设置了Map过程的输出类型,其中设置value的输出类型为IntWritable;
job.setOutputValueClass(IntWritable.class);
//调用job.waitForCompletion(true)执行任务,执行成功后退出;
System.exit(job.waitForCompletion(true)0:1);
}catch(Exceptione){
e.printStackTrace();
}finally{
if(admin!=null)
try{
admin.close();
}catch(IOExceptione){
e.printStackTrace();
}
}

}
}


例2 HBase之读取HBase数据写入HDFS

packageorg.hadoop.hbase;
importjava.io.IOException;
importjava.util.Map.Entry;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.client.Result;
importorg.apache.hadoop.hbase.client.Scan;
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
importorg.apache.hadoop.hbase.mapreduce.TableMapper;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;
publicclassWordCountHbaseReader{

publicstaticclassWordCountHbaseReaderMapperextends
TableMapper<Text,Text>{
@Override
protectedvoidmap(ImmutableBytesWritablekey,Resultvalue,Contextcontext)
throwsIOException,InterruptedException{
StringBuffersb=newStringBuffer("");
for(Entry<byte[],byte[]>entry:value.getFamilyMap("content".getBytes()).entrySet()){
Stringstr=newString(entry.getValue());
//将字节数组转换为String类型
if(str!=null){
sb.append(newString(entry.getKey()));
sb.append(":");
sb.append(str);
}
context.write(newText(key.get()),newText(newString(sb)));
}
}
}
publicstaticclassWordCountHbaseReaderReduceextendsReducer<Text,Text,Text,Text>{
privateTextresult=newText();
@Override
protectedvoidreduce(Textkey,Iterable<Text>values,Contextcontext)
throwsIOException,InterruptedException{
for(Textval:values){
result.set(val);
context.write(key,result);
}
}
}

publicstaticvoidmain(String[]args)throwsException{
Stringtablename="wordcount";
Configurationconf=HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","192.168.1.139");
conf.set("hbase.zookeeper.property.clientPort","2191");

String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=1){
System.err.println("Usage:WordCountHbaseReader<out>");
System.exit(2);
}
Jobjob=newJob(conf,"WordCountHbaseReader");
job.setJarByClass(WordCountHbaseReader.class);
//设置任务数据的输出路径;
FileOutputFormat.setOutputPath(job,newPath(otherArgs[0]));
job.setReducerClass(WordCountHbaseReaderReduce.class);
Scanscan=newScan();
TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class,Text.class,Text.class,job);
//调用job.waitForCompletion(true)执行任务,执行成功后退出;
System.exit(job.waitForCompletion(true)0:1);

}
}


程序中用到hadoop的相关JAR包(如下图)及hbase所有jar包

wKiom1VSw0bjkI7QAABrwzSlqdo801.jpg

如果上面的API还不能满足你的要求,可以到下面这个网站里面Hbase全部API介绍

http://www.yiibai.com/hbase/


本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1650856

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase安全及namespace操作 下一篇【hbase】thrift2 集群 && 指定端..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目