设为首页 加入收藏

TOP

Hbase与mapreduce结合使用
2018-12-05 17:57:39 】 浏览:125
Tags:Hbase mapreduce 结合 使用
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011955252/article/details/50525833

1:改程序主要使用的将Mapreduce产生的结果输出到HBase指定的表中

package count;
import java.io.IOException;


public class WordCount {

public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{


Text word=new Text();
IntWritable one=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
StringTokenizer tokenizer=new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
String v=tokenizer.nextToken();
word.set(v);
context.write(word,one);
}
}
}

public static class MyReducer extends TableReducer<Text,IntWritable,NullWritable>{


@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable value:values){
sum+=value.get();
}
Put put=new Put(key.toString().getBytes());
put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(String.valueOf(sum)));
context.write(NullWritable.get(),put);
}
}
public static void createHbaseTable(String tablename) throws IOException{
Configuration conf=HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop1");
conf.set("hbase.zookeeper.quorum", "hadoop2");
conf.set("dfs.socket.timeout", "180000");
HBaseAdmin admin=new HBaseAdmin(conf);
HTableDescriptor table=new HTableDescriptor(tablename);
HColumnDescriptor column=new HColumnDescriptor("content");
table.addFamily(column);
if(admin.tableExists(tablename)){
System.out.println("该表已经存在了");
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
admin.createTable(table);
}


public static final String HDFS_PATH="hdfs://222.27.174.67:9000";
public static final String INPUT="/wordcount";
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String tablename="wordcount";
Configuration conf=new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE,tablename);
createHbaseTable(tablename);

Job job=new Job(conf,"wordcount");
job.setJarByClass(WordCount.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(MyReducer.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(HDFS_PATH+INPUT));
//FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) 0 : 1);
}


}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇javaAPI-Hbase异步之批量高效写入.. 下一篇HBase 维护--查看HLog和HFile

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目