设为首页 加入收藏

TOP

mapreduce编程模型之hbase表作为数据源输入输出
2019-02-25 01:46:02 】 浏览:62
Tags:mapreduce 编程 模型 hbase 作为 数据源 输入 输出
  1. packagecn.luxh.app;
  2. importjava.io.IOException;
  3. importjava.util.StringTokenizer;
  4. importorg.apache.hadoop.conf.Configuration;
  5. importorg.apache.hadoop.hbase.HBaseConfiguration;
  6. importorg.apache.hadoop.hbase.client.Put;
  7. importorg.apache.hadoop.hbase.client.Result;
  8. importorg.apache.hadoop.hbase.client.Scan;
  9. importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
  10. importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  11. importorg.apache.hadoop.hbase.mapreduce.TableMapper;
  12. importorg.apache.hadoop.hbase.mapreduce.TableReducer;
  13. importorg.apache.hadoop.hbase.util.Bytes;
  14. importorg.apache.hadoop.io.IntWritable;
  15. importorg.apache.hadoop.io.Text;
  16. importorg.apache.hadoop.mapreduce.Job;
  17. /**
  18. *@authorLuxh
  19. *
  20. */
  21. publicclassWordStat{
  22. /**
  23. *TableMapper<Text,IntWritable>Text:输出的key类型,IntWritable:输出的value类型
  24. */
  25. publicstaticclassMyMapperextendsTableMapper<Text,IntWritable>{
  26. privatestaticIntWritableone=newIntWritable(1);
  27. privatestaticTextword=newText();
  28. @Override
  29. protectedvoidmap(ImmutableBytesWritablekey,Resultvalue,
  30. Contextcontext)
  31. throwsIOException,InterruptedException{
  32. //表里面只有一个列族,所以我就直接获取每一行的值
  33. Stringwords=Bytes.toString(value.list().get(0).getValue());
  34. StringTokenizerst=newStringTokenizer(words);
  35. while(st.hasMoreTokens()){
  36. Strings=st.nextToken();
  37. word.set(s);
  38. context.write(word,one);
  39. }
  40. }
  41. }
  42. /**
  43. *TableReducer<Text,IntWritable>Text:输入的key类型,IntWritable:输入的value类型,ImmutableBytesWritable:输出类型
  44. */
  45. publicstaticclassMyReducerextendsTableReducer<Text,IntWritable,ImmutableBytesWritable>{
  46. @Override
  47. protectedvoidreduce(Textkey,Iterable<IntWritable>values,
  48. Contextcontext)
  49. throwsIOException,InterruptedException{
  50. intsum=0;
  51. for(IntWritableva l:values){
  52. sum+=val.get();
  53. }
  54. //添加一行记录,每一个单词作为行键
  55. Putput=newPut(Bytes.toBytes(key.toString()));
  56. //在列族result中添加一个标识符num,赋值为每个单词出现的次数
  57. //String.valueOf(sum)先将数字转化为字符串,否则存到数据库后会变成\x00\x00\x00\x这种形式
  58. //然后再转二进制存到hbase。
  59. put.add(Bytes.toBytes("result"),Bytes.toBytes("num"),Bytes.toBytes(String.valueOf(sum)));
  60. context.write(newImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
  61. }
  62. }
  63. publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
  64. Configurationconf=HBaseConfiguration.create();
  65. Jobjob=newJob(conf,"wordstat");
  66. job.setJarByClass(Blog.class);
  67. Scanscan=newScan();
  68. //指定要查询的列族
  69. scan.addColumn(Bytes.toBytes("content"),null);
  70. //指定Mapper读取的表为word
  71. TableMapReduceUtil.initTableMapperJob("word",scan,MyMapper.class,Text.class,IntWritable.class,job);
  72.     //指定Reducer写入的表为stat
  73. TableMapReduceUtil.initTableReducerJob("stat",MyReducer.class,job);
  74. System.exit(job.waitForCompletion(true)0:1);
  75. }
  76. }
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBASE列族不能太多的真相 下一篇Flume、Kafka、Hbase、Hive适用场..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目