MapReduce功能实现系列:
MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
MapReduce功能实现二---排序
MapReduce功能实现三---Top N
MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五---去重(Distinct)、计数(Count)
MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七---小综合(多个job串行处理计算平均值)
MapReduce功能实现八---分区(Partition)
MapReduce功能实现九---Pv、Uv
MapReduce功能实现十---倒排索引(Inverted Index)
MapReduce功能实现十一---join
一、从Hbase表1中读取数据再把统计结果存到表2
在Hbase中建立相应的表1:
-
create 'hello','cf'
-
put 'hello','1','cf:hui','hello world'
-
put 'hello','2','cf:hui','hello hadoop'
-
put 'hello','3','cf:hui','hello hive'
-
put 'hello','4','cf:hui','hello hadoop'
-
put 'hello','5','cf:hui','hello world'
-
put 'hello','6','cf:hui','hello world'
java代码:
-
import java.io.IOException;
-
import java.util.Iterator;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
-
import org.apache.hadoop.hbase.HTableDescriptor;
-
import org.apache.hadoop.hbase.client.HBaseAdmin;
-
import org.apache.hadoop.hbase.client.Put;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.mapreduce.TableReducer;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
public class HBaseToHbase {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
String hbaseTableName1 = "hello";
-
String hbaseTableName2 = "mytb2";
-
prepareTB2(hbaseTableName2);
-
Configuration conf = new Configuration();
-
Job job = Job.getInstance(conf);
-
job.setJarByClass(HBaseToHbase.class);
-
job.setJobName("mrreadwritehbase");
-
Scan scan = new Scan();
-
scan.setCaching(500);
-
scan.setCacheBlocks(false);
-
TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);
-
TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);
-
System.exit(job.waitForCompletion(true) 1 : 0);
-
}
-
public static class doMapper extends TableMapper<Text, IntWritable>{
-
private final static IntWritable one = new IntWritable(1);
-
@Override
-
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
String rowValue = Bytes.toString(value.list().get(0).getValue());
-
context.write(new Text(rowValue), one);
-
}
-
}
-
public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{
-
@Override
-
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
System.out.println(key.toString());
-
int sum = 0;
-
Iterator<IntWritable> haha = values.iterator();
-
while (haha.hasNext()) {
-
sum += haha.next().get();
-
}
-
Put put = new Put(Bytes.toBytes(key.toString()));
-
put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
-
context.write(NullWritable.get(), put);
-
}
-
}
-
public static void prepareTB2(String hbaseTableName) throws IOException{
-
HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);
-
HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");
-
tableDesc.addFamily(columnDesc);
-
Configuration cfg = HBaseConfiguration.create();
-
HBaseAdmin admin = new HBaseAdmin(cfg);
-
if (admin.tableExists(hbaseTableName)) {
-
System.out.println("Table exists,trying drop and create!");
-
admin.disableTable(hbaseTableName);
-
admin.deleteTable(hbaseTableName);
-
admin.createTable(tableDesc);
-
} else {
-
System.out.println("create table: "+ hbaseTableName);
-
admin.createTable(tableDesc);
-
}
-
}
-
}
在Linux中执行该代码:
-
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HBaseToHbase.java
-
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HBaseToHbase*class
-
[hadoop@h71 q1]$ hadoop jar xx.jar HBaseToHbase
MapReduce功能实现系列:
MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
MapReduce功能实现二---排序
MapReduce功能实现三---Top N
MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五---去重(Distinct)、计数(Count)
MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七---小综合(多个job串行处理计算平均值)
MapReduce功能实现八---分区(Partition)
MapReduce功能实现九---Pv、Uv
MapReduce功能实现十---倒排索引(Inverted Index)
MapReduce功能实现十一---join
一、从Hbase表1中读取数据再把统计结果存到表2
在Hbase中建立相应的表1:
-
create 'hello','cf'
-
put 'hello','1','cf:hui','hello world'
-
put 'hello','2','cf:hui','hello hadoop'
-
put 'hello','3','cf:hui','hello hive'
-
put 'hello','4','cf:hui','hello hadoop'
-
put 'hello','5','cf:hui','hello world'
-
put 'hello','6','cf:hui','hello world'
java代码:
-
import java.io.IOException;
-
import java.util.Iterator;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
-
import org.apache.hadoop.hbase.HTableDescriptor;
-
import org.apache.hadoop.hbase.client.HBaseAdmin;
-
import org.apache.hadoop.hbase.client.Put;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.mapreduce.TableReducer;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
public class HBaseToHbase {
-
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-
String hbaseTableName1 = "hello";
-
String hbaseTableName2 = "mytb2";
-
prepareTB2(hbaseTableName2);
-
Configuration conf = new Configuration();
-
Job job = Job.getInstance(conf);
-
job.setJarByClass(HBaseToHbase.class);
-
job.setJobName("mrreadwritehbase");
-
Scan scan = new Scan();
-
scan.setCaching(500);
-
scan.setCacheBlocks(false);
-
TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);
-
TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);
-
System.exit(job.waitForCompletion(true) 1 : 0);
-
}
-
public static class doMapper extends TableMapper<Text, IntWritable>{
-
private final static IntWritable one = new IntWritable(1);
-
@Override
-
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
String rowValue = Bytes.toString(value.list().get(0).getValue());
-
context.write(new Text(rowValue), one);
-
}
-
}
-
public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{
-
@Override
-
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
System.out.println(key.toString());
-
int sum = 0;
-
Iterator<IntWritable> haha = values.iterator();
-
while (haha.hasNext()) {
-
sum += haha.next().get();
-
}
-
Put put = new Put(Bytes.toBytes(key.toString()));
-
put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
-
context.write(NullWritable.get(), put);
-
}
-
}
-
public static void prepareTB2(String hbaseTableName) throws IOException{
-
HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);
-
HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");
-
tableDesc.addFamily(columnDesc);
-
Configuration cfg = HBaseConfiguration.create();
-
HBaseAdmin admin = new HBaseAdmin(cfg);
-
if (admin.tableExists(hbaseTableName)) {
-
System.out.println("Table exists,trying drop and create!");
-
admin.disableTable(hbaseTableName);
-
admin.deleteTable(hbaseTableName);
-
admin.createTable(tableDesc);
-
} else {
-
System.out.println("create table: "+ hbaseTableName);
-
admin.createTable(tableDesc);
-
}
-
}
-
}
在Linux中执行该代码:
-
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HBaseToHbase.java
-
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HBaseToHbase*class
-
[hadoop@h71 q1]$ hadoop jar xx.jar HBaseToHbase
查看mytb2表:
-
hbase(main):009:0> scan 'mytb2'
-
ROW COLUMN+CELL
-
hello hadoop column=mycolumnfamily:count, timestamp=1489817182454, value=2
-
hello hive column=mycolumnfamily:count, timestamp=1489817182454, value=1
-
hello world column=mycolumnfamily:count, timestamp=1489817182454, value=3
-
3 row(s) in 0.0260 seconds
二、从Hbase表1中读取数据再把结果存Hdfs中
查看mytb2表:
-
hbase(main):009:0> scan 'mytb2'
-
ROW COLUMN+CELL
-
hello hadoop column=mycolumnfamily:count, timestamp=1489817182454, value=2
-
hello hive column=mycolumnfamily:count, timestamp=1489817182454, value=1
-
hello world column=mycolumnfamily:count, timestamp=1489817182454, value=3
-
3 row(s) in 0.0260 seconds
有需要的联系我
2317348976
yxxy1717
|