设为首页 加入收藏

TOP

hbase与hdfs之间的数据转换
2019-05-12 00:16:08 】 浏览:108
Tags:hbase hdfs 之间 数据 转换

MapReduce功能实现系列:

MapReduce功能实现一---Hbase和Hdfs之间数据相互转换

MapReduce功能实现二---排序

MapReduce功能实现三---Top N

MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)

MapReduce功能实现五---去重(Distinct)、计数(Count)

MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)

MapReduce功能实现七---小综合(多个job串行处理计算平均值)

MapReduce功能实现八---分区(Partition)

MapReduce功能实现九---Pv、Uv

MapReduce功能实现十---倒排索引(Inverted Index)

MapReduce功能实现十一---join

一、从Hbase表1中读取数据再把统计结果存到表2

在Hbase中建立相应的表1:


  1. create 'hello','cf'

  2. put 'hello','1','cf:hui','hello world'

  3. put 'hello','2','cf:hui','hello hadoop'

  4. put 'hello','3','cf:hui','hello hive'

  5. put 'hello','4','cf:hui','hello hadoop'

  6. put 'hello','5','cf:hui','hello world'

  7. put 'hello','6','cf:hui','hello world'


java代码:


  1. import java.io.IOException;

  2. import java.util.Iterator;

  3. import org.apache.hadoop.conf.Configuration;

  4. import org.apache.hadoop.hbase.HBaseConfiguration;

  5. import org.apache.hadoop.hbase.HColumnDescriptor;

  6. import org.apache.hadoop.hbase.HTableDescriptor;

  7. import org.apache.hadoop.hbase.client.HBaseAdmin;

  8. import org.apache.hadoop.hbase.client.Put;

  9. import org.apache.hadoop.hbase.client.Result;

  10. import org.apache.hadoop.hbase.client.Scan;

  11. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

  12. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

  13. import org.apache.hadoop.hbase.mapreduce.TableMapper;

  14. import org.apache.hadoop.hbase.mapreduce.TableReducer;

  15. import org.apache.hadoop.hbase.util.Bytes;

  16. import org.apache.hadoop.io.IntWritable;

  17. import org.apache.hadoop.io.NullWritable;

  18. import org.apache.hadoop.io.Text;

  19. import org.apache.hadoop.mapreduce.Job;

  20. public class HBaseToHbase {

  21. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

  22. String hbaseTableName1 = "hello";

  23. String hbaseTableName2 = "mytb2";

  24. prepareTB2(hbaseTableName2);

  25. Configuration conf = new Configuration();

  26. Job job = Job.getInstance(conf);

  27. job.setJarByClass(HBaseToHbase.class);

  28. job.setJobName("mrreadwritehbase");

  29. Scan scan = new Scan();

  30. scan.setCaching(500);

  31. scan.setCacheBlocks(false);

  32. TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);

  33. TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);

  34. System.exit(job.waitForCompletion(true) 1 : 0);

  35. }

  36. public static class doMapper extends TableMapper<Text, IntWritable>{

  37. private final static IntWritable one = new IntWritable(1);

  38. @Override

  39. protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

  40. String rowValue = Bytes.toString(value.list().get(0).getValue());

  41. context.write(new Text(rowValue), one);

  42. }

  43. }

  44. public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{

  45. @Override

  46. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

  47. System.out.println(key.toString());

  48. int sum = 0;

  49. Iterator<IntWritable> haha = values.iterator();

  50. while (haha.hasNext()) {

  51. sum += haha.next().get();

  52. }

  53. Put put = new Put(Bytes.toBytes(key.toString()));

  54. put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));

  55. context.write(NullWritable.get(), put);

  56. }

  57. }

  58. public static void prepareTB2(String hbaseTableName) throws IOException{

  59. HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);

  60. HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");

  61. tableDesc.addFamily(columnDesc);

  62. Configuration cfg = HBaseConfiguration.create();

  63. HBaseAdmin admin = new HBaseAdmin(cfg);

  64. if (admin.tableExists(hbaseTableName)) {

  65. System.out.println("Table exists,trying drop and create!");

  66. admin.disableTable(hbaseTableName);

  67. admin.deleteTable(hbaseTableName);

  68. admin.createTable(tableDesc);

  69. } else {

  70. System.out.println("create table: "+ hbaseTableName);

  71. admin.createTable(tableDesc);

  72. }

  73. }

  74. }


在Linux中执行该代码:


  1. [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HBaseToHbase.java

  2. [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HBaseToHbase*class

  3. [hadoop@h71 q1]$ hadoop jar xx.jar HBaseToHbase

MapReduce功能实现系列:

MapReduce功能实现一---Hbase和Hdfs之间数据相互转换

MapReduce功能实现二---排序

MapReduce功能实现三---Top N

MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)

MapReduce功能实现五---去重(Distinct)、计数(Count)

MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)

MapReduce功能实现七---小综合(多个job串行处理计算平均值)

MapReduce功能实现八---分区(Partition)

MapReduce功能实现九---Pv、Uv

MapReduce功能实现十---倒排索引(Inverted Index)

MapReduce功能实现十一---join

一、从Hbase表1中读取数据再把统计结果存到表2

在Hbase中建立相应的表1:


  1. create 'hello','cf'

  2. put 'hello','1','cf:hui','hello world'

  3. put 'hello','2','cf:hui','hello hadoop'

  4. put 'hello','3','cf:hui','hello hive'

  5. put 'hello','4','cf:hui','hello hadoop'

  6. put 'hello','5','cf:hui','hello world'

  7. put 'hello','6','cf:hui','hello world'


java代码:


  1. import java.io.IOException;

  2. import java.util.Iterator;

  3. import org.apache.hadoop.conf.Configuration;

  4. import org.apache.hadoop.hbase.HBaseConfiguration;

  5. import org.apache.hadoop.hbase.HColumnDescriptor;

  6. import org.apache.hadoop.hbase.HTableDescriptor;

  7. import org.apache.hadoop.hbase.client.HBaseAdmin;

  8. import org.apache.hadoop.hbase.client.Put;

  9. import org.apache.hadoop.hbase.client.Result;

  10. import org.apache.hadoop.hbase.client.Scan;

  11. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

  12. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

  13. import org.apache.hadoop.hbase.mapreduce.TableMapper;

  14. import org.apache.hadoop.hbase.mapreduce.TableReducer;

  15. import org.apache.hadoop.hbase.util.Bytes;

  16. import org.apache.hadoop.io.IntWritable;

  17. import org.apache.hadoop.io.NullWritable;

  18. import org.apache.hadoop.io.Text;

  19. import org.apache.hadoop.mapreduce.Job;

  20. public class HBaseToHbase {

  21. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

  22. String hbaseTableName1 = "hello";

  23. String hbaseTableName2 = "mytb2";

  24. prepareTB2(hbaseTableName2);

  25. Configuration conf = new Configuration();

  26. Job job = Job.getInstance(conf);

  27. job.setJarByClass(HBaseToHbase.class);

  28. job.setJobName("mrreadwritehbase");

  29. Scan scan = new Scan();

  30. scan.setCaching(500);

  31. scan.setCacheBlocks(false);

  32. TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);

  33. TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);

  34. System.exit(job.waitForCompletion(true) 1 : 0);

  35. }

  36. public static class doMapper extends TableMapper<Text, IntWritable>{

  37. private final static IntWritable one = new IntWritable(1);

  38. @Override

  39. protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

  40. String rowValue = Bytes.toString(value.list().get(0).getValue());

  41. context.write(new Text(rowValue), one);

  42. }

  43. }

  44. public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{

  45. @Override

  46. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

  47. System.out.println(key.toString());

  48. int sum = 0;

  49. Iterator<IntWritable> haha = values.iterator();

  50. while (haha.hasNext()) {

  51. sum += haha.next().get();

  52. }

  53. Put put = new Put(Bytes.toBytes(key.toString()));

  54. put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));

  55. context.write(NullWritable.get(), put);

  56. }

  57. }

  58. public static void prepareTB2(String hbaseTableName) throws IOException{

  59. HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);

  60. HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");

  61. tableDesc.addFamily(columnDesc);

  62. Configuration cfg = HBaseConfiguration.create();

  63. HBaseAdmin admin = new HBaseAdmin(cfg);

  64. if (admin.tableExists(hbaseTableName)) {

  65. System.out.println("Table exists,trying drop and create!");

  66. admin.disableTable(hbaseTableName);

  67. admin.deleteTable(hbaseTableName);

  68. admin.createTable(tableDesc);

  69. } else {

  70. System.out.println("create table: "+ hbaseTableName);

  71. admin.createTable(tableDesc);

  72. }

  73. }

  74. }


在Linux中执行该代码:


  1. [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HBaseToHbase.java

  2. [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HBaseToHbase*class

  3. [hadoop@h71 q1]$ hadoop jar xx.jar HBaseToHbase


查看mytb2表:


  1. hbase(main):009:0> scan 'mytb2'

  2. ROW COLUMN+CELL

  3. hello hadoop column=mycolumnfamily:count, timestamp=1489817182454, value=2

  4. hello hive column=mycolumnfamily:count, timestamp=1489817182454, value=1

  5. hello world column=mycolumnfamily:count, timestamp=1489817182454, value=3

  6. 3 row(s) in 0.0260 seconds


二、从Hbase表1中读取数据再把结果存Hdfs中

查看mytb2表:


  1. hbase(main):009:0> scan 'mytb2'

  2. ROW COLUMN+CELL

  3. hello hadoop column=mycolumnfamily:count, timestamp=1489817182454, value=2

  4. hello hive column=mycolumnfamily:count, timestamp=1489817182454, value=1

  5. hello world column=mycolumnfamily:count, timestamp=1489817182454, value=3

  6. 3 row(s) in 0.0260 seconds

有需要的联系我

2317348976

yxxy1717

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇TCP粘包和拆包 下一篇数据库设计1—需求分析

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目