版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013181284/article/details/50775240
MapReduce批量操作HBase
一、三种访问模式简介
1、使用MapReduce访问HBase共有三种访问模式
HBase作为输入源(Data Source)
输入源也可以是RDBMS、HDFS或其他NoSQL
多用于统计现有HBase中的相关数据,结果输出到非HBase中
HBase作为输出源(Data Sink)
输出源可以是RDBMS、HDFS 输出到HBase中
常用于数据迁移,从传统存储迁移到HBase
HBase作为共享源(Shared Resource)
输入和输出源都是HBase
常用于当数据已经存储在HBase中,中间结果处理后仍需要HBase作为存储的场景
2、实现MapReduce API
类名 |
用途 |
TableInputFormat |
将HBase中的表格式数据格式化为Mapreduce可读的数据 |
TableOutputFromat |
将MapReduce格式数据转化为HBase表格式数据 |
MultiTableInputFormat |
将多个HBase中的表格式数据格式化为MapReduce可读的数据 |
MultiTableOutputFormat |
将MapReduce格式数据转化到多个Hbase表中 |
TableMapper |
扩展自Mapper类,所有以HBase作为输入源的Mapper类需要继承该类 |
TableReducer |
扩展子Reducer类,所有以HBase作为输出源的Reducer类需要继承该类 |
TableMapReduceUtil |
设置TableMapper和TableReducer的工具类 |
CellCounter |
计算表中单元格数据量的任务 |
RowCounter |
计算表中行键数量的任务
|
3、HBase作为输入源
从HBase表中读取数据,使用MapReduce计算完成之后,将数据存储到其他介质中,如HDFS中。
package com.mr.inputsource;
import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
*
* @author shx
*
* HBase 中的表 作为 输入源
* 扩展自Mapper类,所有以HBase作为输入源的Mapper类需要继承该类
*/
public class MemberMapper extends TableMapper<Writable, Writable>{
private Text k = new Text();
private Text v = new Text();
public static final String FIELD_COMMON_SEPARATOR="\u0001";
@Override
protected void setup(Context context) throws IOException ,InterruptedException {
}
@Override
public void map(ImmutableBytesWritable row, Result columns,
Context context) throws IOException ,InterruptedException {
String value = null;
//获得行键值
String rowkey = new String(row.get());
//一行中的所有 列族
byte[] columnFamily = null;
//一行中的所有列名
byte[] columnQualifier = null;
long ts = 0L;
try{
//遍历一行中的所有列
for(KeyValue kv : columns.list()){
//单元格的值
value = Bytes.toStringBinary(kv.getValue()); // 25
//获得一行中的所有列族
columnFamily = kv.getFamily(); //info
//获得一行中的所有列名
columnQualifier = kv.getQualifier();
//获取单元格的时间戳
ts = kv.getTimestamp();
if("25".equals(value)){
k.set(rowkey);
v.set(Bytes.toString(columnFamily)+FIELD_COMMON_SEPARATOR+Bytes.toString(columnQualifier)
+FIELD_COMMON_SEPARATOR+value+FIELD_COMMON_SEPARATOR+ts);
context.write(k, v);
break;
}
}
}catch(Exception e){
e.printStackTrace();
System.err.println("Error:"+e.getMessage()+",Row:"+Bytes.toString(row.get())+",Value"+value);
}
};
}
package com.mr.inputsource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
*
* @authorshx
*
* HBase 作为输入源,从HBase表中读取数据,使用MapReduce计算完成之后,将数据存储到HDFS中
*
*/
public class Main {
static final Log LOG = LogFactory.getLog(Main.class);
//JobName
public static final String NAME = "Member Test1";
//HDFS system file
public static final String TEMP_INDEX_PATH = "/tmp/member_user";
//HBase作为输入源的HBase中的表 member_user
public static String inputTable = "member_user";
/**
* @param args
*/
public static void main(String[] args)throws Exception {
//1.获得HBase的配置信息
Configuration conf = HBaseConfiguration.create();
//conf = new Configuration();
//2.创建全表扫描器scan对象
Scan scan = new Scan();
scan.setBatch(0);
scan.setCaching(10000);
scan.setMaxVersions();
scan.setTimeRange(System.currentTimeMillis() - 3*24*3600*1000L, System.currentTimeMillis());
//3.配置scan,添加扫描的条件,列族和列族名
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
//4.设置hadoop的推测执行为fasle
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
//5.设置HDFS的存储HBase表中数据的路径
Path tmpIndexPath = new Path(TEMP_INDEX_PATH);
FileSystem fs = FileSystem.get(conf);
//判断该路径是否存在,如果存在则首先进行删除
if(fs.exists(tmpIndexPath)){
fs.delete(tmpIndexPath,true);
}
//6.创建job对象
Job job = new Job(conf,NAME);
//设置执行JOB的类
job.setJarByClass(Main.class);
//job.setMapperClass(MemberMapper.class);
//设置TableMapper类的相关信息,即对MemberMapper类的初始化设置
//(hbase输入源对应的表, 扫描器, 负责整个计算的逻辑,输出key的类型,输出value的类型,job )
//TableMapReduceUtil.initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job);
TableMapReduceUtil.initTableMapperJob(inputTable, scan, MemberMapper.class, Text.class, Text.class, job);
//设置作业的Reduce任务个数为0
job.setNumReduceTasks(0);
//设置从HBase表中经过MapReduece计算后的结果以文本的格式输出
job.setOutputFormatClass(TextOutputFormat.class);
//设置作业输出结果保存到HDFS的文件路径
FileOutputFormat.setOutputPath(job, tmpIndexPath);
//开始运行作业
boolean success = job.waitForCompletion(true);
System.exit(success0:1);
}
}
4、HBase作为输出源
即从其他介质中,使用MapReduce计算后将结果输出到HBase表。
package com.mr.outputsource;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
public class MemberMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private byte[] family = null;
private byte[] qualifier = null;
private byte[] val = null;
private String rowkey = null;
private long ts = System.currentTimeMillis();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
try{
String lineString = value.toString();
String[] arr = lineString.split("\t",-1);
if(arr.length==2){
rowkey = arr[0];
String[] vals = arr[1].split("\u0001", -1);
if(vals.length==4){
family = vals[0].getBytes();
qualifier = vals[1].getBytes();
val = vals[2].getBytes();
ts = Long.parseLong(vals[3]);
Put put = new Put(rowkey.getBytes(),ts);
put.add(family, qualifier, val);
context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
}
}
}catch(Exception e){
e.printStackTrace();
}
};
}
package com.mr.outputsource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
** @authors
*
*/
public class Main extends Configured implements Tool{
static final Log LOG = LogFactory.getLog(Main.class);
@Override
public int run(String[] args) throws Exception {
if(args.length !=3){
LOG.info("Usage: 3 parameters needed!");
System.exit(1);
}
String path = args[0];
String input = args[1];
String table = args[2];
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "Input from file"+input+" into table"+table);
job.setJarByClass(Main.class);
job.setMapperClass(MemberMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
return job.waitForCompletion(true)0:1;
}
/**
* @param args
*/
public static void main(String[] args)throws Exception {
//Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
for(int i=0;i<otherArgs.length;i++){
System.out.println(otherArgs[i]);
}
int res = 1;
try{
res = ToolRunner.run(conf, new Main(), otherArgs);
}catch(Exception e){
e.printStackTrace();
}
System.exit(res);
}
}
5、HBase作为共享源
即HBase又作为输入源,同时作为输出源。
从HBase中读取数据,计算完成后,又将结果输出到HBase的另一张表。
package com.mr.publicsource;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
public class MemberMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
//定义列族
private byte[] family = null;
//定义列名
private byte[] qualifier = null;
//定义单元格存放的值
private byte[] val = null;
//定义行键
private String rowkey = null;
//定义时间戳
private long ts = System.currentTimeMillis();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
try{
String lineString = value.toString();
String[] arr = lineString.split("\t",-1);
if(arr.length==2){
//给行键进行赋值
rowkey = arr[0];
String[] vals = arr[1].split("\u0001", -1);
if(vals.length==4){
family = vals[0].getBytes();
qualifier = vals[1].getBytes();
val = vals[2].getBytes();
ts = Long.parseLong(vals[3]);
Put put = new Put(rowkey.getBytes(), ts);
put.add(family, qualifier, val);
context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
}
}
}catch(Exception e){
e.printStackTrace();
}
};
}
package com.mr.publicsource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.FileSystem;
import com.mr.inputsource.MemberMapper;
/**
* HBase作为共享源
* 将HBase中一张表的统计结果输出到HBase的另一张表中。
* @author shx
*
*/
public class Main {
static final Log Log = LogFactory.getLog(Main.class);
public static final String NAME = "Example Test";
public static final String inputTable = "member";
public static final String outputTable = "member-stat";
public static final String TMP_INDEX_PATH = "/tmp/member2";
/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration conf = HBaseConfiguration.create();
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
Path tmpIndexPath = new Path(TMP_INDEX_PATH);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(tmpIndexPath)){
fs.delete(tmpIndexPath, true);
}
Job job = new Job(conf, NAME);
job.setJarByClass(Main.class);
job.setMapperClass(com.mr.publicsource.MemberMapper.class);
//从HBase中读取数据,并
TableMapReduceUtil.initTableMapperJob(inputTable, scan, MemberMapper.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob(outputTable, IdentityTableReducer.class, job);
int success = job.waitForCompletion(true)0:1;
System.exit(success);
}
}