设为首页 加入收藏

TOP

使用MapReduce访问HBase三种访问模式
2018-11-28 17:37:36 】 浏览:43
Tags:使用 MapReduce 访问 HBase 模式
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013181284/article/details/50775240

MapReduce批量操作HBase
一、三种访问模式简介
1、使用MapReduce访问HBase共有三种访问模式
HBase作为输入源(Data Source)
输入源也可以是RDBMS、HDFS或其他NoSQL
多用于统计现有HBase中的相关数据,结果输出到非HBase中
HBase作为输出源(Data Sink)
输出源可以是RDBMS、HDFS 输出到HBase中
常用于数据迁移,从传统存储迁移到HBase
HBase作为共享源(Shared Resource)
输入和输出源都是HBase
常用于当数据已经存储在HBase中,中间结果处理后仍需要HBase作为存储的场景

2、实现MapReduce API

类名 用途
TableInputFormat 将HBase中的表格式数据格式化为Mapreduce可读的数据
TableOutputFromat 将MapReduce格式数据转化为HBase表格式数据
MultiTableInputFormat 将多个HBase中的表格式数据格式化为MapReduce可读的数据
MultiTableOutputFormat 将MapReduce格式数据转化到多个Hbase表中
TableMapper 扩展自Mapper类,所有以HBase作为输入源的Mapper类需要继承该类
TableReducer 扩展子Reducer类,所有以HBase作为输出源的Reducer类需要继承该类
TableMapReduceUtil 设置TableMapper和TableReducer的工具类
CellCounter 计算表中单元格数据量的任务
RowCounter
计算表中行键数量的任务

3、HBase作为输入源
从HBase表中读取数据,使用MapReduce计算完成之后,将数据存储到其他介质中,如HDFS中。

package com.mr.inputsource;

import java.io.IOException;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

/**
 * 
 * @author shx
 *
 *	HBase 中的表 作为 输入源
 *  扩展自Mapper类,所有以HBase作为输入源的Mapper类需要继承该类
 */

public class MemberMapper extends TableMapper<Writable, Writable>{
	
	private Text k = new Text();
	private Text v = new Text();
	
	public static final String FIELD_COMMON_SEPARATOR="\u0001";
	
	@Override 
	protected void setup(Context context) throws IOException ,InterruptedException {	
		
	}
	
	@Override
	public void map(ImmutableBytesWritable row, Result columns, 
			Context context) throws IOException ,InterruptedException {
		
		String value = null;
		
		//获得行键值
		String rowkey = new String(row.get());
		
		//一行中的所有 列族
		byte[] columnFamily = null;
		
		//一行中的所有列名
		byte[] columnQualifier = null;
		
		long ts = 0L;
		
		try{
			//遍历一行中的所有列
			for(KeyValue kv : columns.list()){
				
				//单元格的值
				value = Bytes.toStringBinary(kv.getValue()); // 25
				
				//获得一行中的所有列族
				columnFamily = kv.getFamily(); //info
				
				//获得一行中的所有列名
				columnQualifier = kv.getQualifier();
				
				//获取单元格的时间戳
				ts = kv.getTimestamp();
				
				if("25".equals(value)){
					k.set(rowkey);
					v.set(Bytes.toString(columnFamily)+FIELD_COMMON_SEPARATOR+Bytes.toString(columnQualifier)
							+FIELD_COMMON_SEPARATOR+value+FIELD_COMMON_SEPARATOR+ts);
					context.write(k, v);
					break;
				}
			}
		}catch(Exception e){
			e.printStackTrace();
			System.err.println("Error:"+e.getMessage()+",Row:"+Bytes.toString(row.get())+",Value"+value);
		}
		
	};
	
	
	
	

}

package com.mr.inputsource;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
*
* @authorshx
*
* HBase 作为输入源,从HBase表中读取数据,使用MapReduce计算完成之后,将数据存储到HDFS中
*
*/
public class Main {


static final Log LOG = LogFactory.getLog(Main.class);


//JobName
public static final String NAME = "Member Test1";

//HDFS system file
public static final String TEMP_INDEX_PATH = "/tmp/member_user";

//HBase作为输入源的HBase中的表 member_user
public static String inputTable = "member_user";
/**
* @param args
*/
public static void main(String[] args)throws Exception {

//1.获得HBase的配置信息
Configuration conf = HBaseConfiguration.create();
//conf = new Configuration();

//2.创建全表扫描器scan对象
Scan scan = new Scan();
scan.setBatch(0);
scan.setCaching(10000);
scan.setMaxVersions();
scan.setTimeRange(System.currentTimeMillis() - 3*24*3600*1000L, System.currentTimeMillis());

//3.配置scan,添加扫描的条件,列族和列族名
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));

//4.设置hadoop的推测执行为fasle
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

//5.设置HDFS的存储HBase表中数据的路径
Path tmpIndexPath = new Path(TEMP_INDEX_PATH);
FileSystem fs = FileSystem.get(conf);
//判断该路径是否存在,如果存在则首先进行删除
if(fs.exists(tmpIndexPath)){
fs.delete(tmpIndexPath,true);
}

//6.创建job对象
Job job = new Job(conf,NAME);

//设置执行JOB的类
job.setJarByClass(Main.class);
//job.setMapperClass(MemberMapper.class);


//设置TableMapper类的相关信息,即对MemberMapper类的初始化设置
//(hbase输入源对应的表, 扫描器, 负责整个计算的逻辑,输出key的类型,输出value的类型,job )
//TableMapReduceUtil.initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job);
TableMapReduceUtil.initTableMapperJob(inputTable, scan, MemberMapper.class, Text.class, Text.class, job);

//设置作业的Reduce任务个数为0
job.setNumReduceTasks(0);

//设置从HBase表中经过MapReduece计算后的结果以文本的格式输出
job.setOutputFormatClass(TextOutputFormat.class);

//设置作业输出结果保存到HDFS的文件路径
FileOutputFormat.setOutputPath(job, tmpIndexPath);

//开始运行作业
boolean success = job.waitForCompletion(true);
System.exit(success0:1);



}

}


4、HBase作为输出源
即从其他介质中,使用MapReduce计算后将结果输出到HBase表。

package com.mr.outputsource;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 
 
public class MemberMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	
	
	private byte[] family = null;
	
	private byte[] qualifier = null;
	private byte[] val = null;
	private String rowkey = null;
	private long ts = System.currentTimeMillis();
	
	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
		try{
			String lineString = value.toString();
			String[] arr = lineString.split("\t",-1);
			if(arr.length==2){
			        rowkey = arr[0];
				String[] vals = arr[1].split("\u0001", -1);
				if(vals.length==4){
					family = vals[0].getBytes();
					qualifier = vals[1].getBytes();
					val = vals[2].getBytes();
					ts = Long.parseLong(vals[3]);
					
					Put put = new Put(rowkey.getBytes(),ts);
					put.add(family, qualifier, val);
					context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
				}
				
			}
		}catch(Exception e){
			e.printStackTrace();
		}
	};

}

package com.mr.outputsource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
** @authors
*
*/
public class Main extends Configured implements Tool{

static final Log LOG = LogFactory.getLog(Main.class);


@Override
public int run(String[] args) throws Exception {
if(args.length !=3){
LOG.info("Usage: 3 parameters needed!");
System.exit(1);
}
String path = args[0];
String input = args[1];
String table = args[2];
Configuration conf = HBaseConfiguration.create();

Job job = new Job(conf, "Input from file"+input+" into table"+table);
job.setJarByClass(Main.class);
job.setMapperClass(MemberMapper.class);

job.setOutputFormatClass(TableOutputFormat.class);

job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);


job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);

job.setNumReduceTasks(0);


FileInputFormat.addInputPath(job, new Path(input));

return job.waitForCompletion(true)0:1;
}


/**
* @param args
*/
public static void main(String[] args)throws Exception {
//Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

for(int i=0;i<otherArgs.length;i++){
System.out.println(otherArgs[i]);
}

int res = 1;
try{
res = ToolRunner.run(conf, new Main(), otherArgs);
}catch(Exception e){
e.printStackTrace();
}
System.exit(res);
}
}



5、HBase作为共享源
即HBase又作为输入源,同时作为输出源。
从HBase中读取数据,计算完成后,又将结果输出到HBase的另一张表。

package com.mr.publicsource;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;

public class MemberMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	
	//定义列族
	private byte[] family = null;
	//定义列名
	private byte[] qualifier = null;
	//定义单元格存放的值
	private byte[] val = null;
	//定义行键
	private String rowkey = null;
	//定义时间戳
	private long ts = System.currentTimeMillis();
	
	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
		try{
			String lineString = value.toString();
			String[] arr = lineString.split("\t",-1);
			if(arr.length==2){
				//给行键进行赋值
				rowkey = arr[0];
				String[] vals = arr[1].split("\u0001", -1);
				if(vals.length==4){
					family = vals[0].getBytes();
					qualifier = vals[1].getBytes();
					val = vals[2].getBytes();
					ts = Long.parseLong(vals[3]);
					
					Put put = new Put(rowkey.getBytes(), ts);
					put.add(family, qualifier, val);
					context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
				}
				
			}
		}catch(Exception e){
			e.printStackTrace();
		}
	};

}

package com.mr.publicsource;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.FileSystem;
import com.mr.inputsource.MemberMapper;

/**
 * HBase作为共享源
 *  将HBase中一张表的统计结果输出到HBase的另一张表中。
 * @author shx
 *
 */
public class Main {
	
	static final Log Log = LogFactory.getLog(Main.class);
	
	public static final String NAME = "Example Test";
	public static final String inputTable = "member";
	public static final String outputTable = "member-stat";
	
	public static final String TMP_INDEX_PATH = "/tmp/member2";

	/**
	 * @param args
	 */
	public static void main(String[] args)throws Exception {
		Configuration conf = HBaseConfiguration.create();
		
		Scan scan = new Scan();
		scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
		
		conf.setBoolean("mapred.map.tasks.speculative.execution", false);
		conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
		
		
		Path tmpIndexPath = new Path(TMP_INDEX_PATH);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(tmpIndexPath)){
			fs.delete(tmpIndexPath, true);
		}
		
		
		Job job = new Job(conf, NAME);
		job.setJarByClass(Main.class);
		job.setMapperClass(com.mr.publicsource.MemberMapper.class);
		
		//从HBase中读取数据,并
		TableMapReduceUtil.initTableMapperJob(inputTable, scan, MemberMapper.class, Text.class, Text.class, job);
		
		TableMapReduceUtil.initTableReducerJob(outputTable,	 IdentityTableReducer.class, job);
		
		int success = job.waitForCompletion(true)0:1;
		System.exit(success);
		
		
	}

}







】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇MAC上配置haddop+Hbase开发环境 下一篇HBase,Redis

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目