设为首页 加入收藏

TOP

Hbase Mapreduce示例:全库扫描(大量数据)
2018-12-07 02:00:04 】 浏览:173
Tags:Hbase Mapreduce 示例 全库 扫描 大量 数据
package com.hbase.mapreduce;


import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.GenericOptionsParser;


import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.util.Bytes;


import com.goodhope.utils.ColumnUtils;


public class ExportHbase {
private static final String INFOCATEGORY = "info:storecategory";


private static final String USAGE = "Usage: ExportHbase " +
"-r <numReduceTasks> -indexConf <iconfFile>\n" +
"-indexDir <indexDir> -webSite <amazon> [-needupdate <true> -isVisible -startTime <long>] -table <tableName> -columns <columnName1> " +
"[<columnName2> ...]";


/**
* Prints the usage message and exists the program.
*
* @param message The message to print first.
*/
private static void printUsage(String message) {
System.err.println(message);
System.err.println(USAGE);
throw new RuntimeException(USAGE);
}


/**
* Creates a new job.
* @param conf
*
* @param args The command line arguments.
* @throws IOException When reading the configuration fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
if (args.length < 7) {
printUsage("Too few arguments");
}


int numReduceTasks = 1;
String iconfFile = null;
String indexDir = null;
String tableName = null;
String website = null;
String needupdate = "";
String expectShopGrade = "";
String dino = "6";
String isdebug = "0";
long debugThreshold = 10000;
String debugThresholdStr = Long.toString(debugThreshold);
String queue = "offline";


long endTime = Long.MAX_VALUE;
int maxversions = 1;
long startTime = System.currentTimeMillis() - 28*24*60*60*1000l;
long distartTime = System.currentTimeMillis() - 30*24*60*60*1000l;
long diusedTime = System.currentTimeMillis() - 30*24*60*60*1000l;
String startTimeStr = Long.toString(startTime);
String diusedTimeStr = Long.toString(diusedTime);
String quorum = null;


String isVisible = "";
List<String> columns = new ArrayList<String>() ;


boolean bFilter = false;


// parse args
for (int i = 0; i < args.length - 1; i++) {
if ("-r".equals(args[i])) {
numReduceTasks = Integer.parseInt(args[++i]);
} else if ("-indexConf".equals(args[i])) {
iconfFile = args[++i];
} else if ("-indexDir".equals(args[i])) {
indexDir = args[++i];
} else if ("-table".equals(args[i])) {
tableName = args[++i];
} else if ("-webSite".equals(args[i])) {
website = args[++i];
} else if ("-startTime".equals(args[i])) {
startTimeStr = args[++i];
startTime = Long.parseLong(startTimeStr);
} else if ("-needupdate".equals(args[i])) {
needupdate = args[++i];
} else if ("-isVisible".equals(args[i])) {
isVisible = "true";
} else if ("-shopgrade".equals(args[i])) {
expectShopGrade = args[++i];
} else if ("-queue".equals(args[i])) {
queue = args[++i];
} else if ("-dino".equals(args[i])) {
dino = args[++i];
} else if ("-maxversions".equals(args[i])) {
maxversions = Integer.parseInt(args[++i]);
} else if ("-distartTime".equals(args[i])) {
distartTime = Long.parseLong(args[++i]);
} else if ("-diendTime".equals(args[i])) {
endTime = Long.parseLong(args[++i]);
} else if ("-diusedTime".equals(args[i])) {
diusedTimeStr = args[++i];
diusedTime = Long.parseLong(diusedTimeStr);
} else if ("-quorum".equals(args[i])) {
quorum = args[++i];
} else if ("-filter".equals(args[i])) {
bFilter = true;
} else if ("-columns".equals(args[i])) {
columns.add(args[++i]);
while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
String columnname = args[++i];
columns.add(columnname);
System.out.println("args column----: " + columnname);
}
} else if ("-debugThreshold".equals(args[i])) {
isdebug = "1";
debugThresholdStr = args[++i];
debugThreshold = Long.parseLong( debugThresholdStr );
}
else {
printUsage("Unsupported option " + args[i]);
}
}


if (distartTime > endTime) {
printUsage("distartTime must <= diendTime");
}


if (indexDir == null || tableName == null || columns.isEmpty()) {
printUsage("Index directory, table name and at least one column must " +
"be specified");
}


if (iconfFile != null) {
// set index configuration content from a file
String content = readContent(iconfFile);
conf.set("hbase.index.conf", content);
conf.set("hbase.website.name", website);
conf.set("hbase.needupdate.productDB", needupdate);
conf.set("hbase.expect.shopgrade", expectShopGrade);
conf.set("hbase.di.no", dino);
conf.set("hbase.expect.item.visible", isVisible);
conf.set("hbase.index.startTime", startTimeStr);
conf.set("hbase.index.diusedTime", diusedTimeStr);
conf.set("hbase.index.debugThreshold", debugThresholdStr);
conf.set("hbase.index.debug", isdebug);
if (quorum != null) {
conf.set("hbase.zookeeper.quorum", quorum);
}
String temp = "";
for (String column : columns) {
temp = temp + column + "|";
}
temp = temp.substring(0, temp.length() - 1);
conf.set("hbase.index.column", temp);
System.out.println("hbase.index.column: " + temp);
}




Job job = new Job(conf, "export data from table " + tableName);
((JobConf) job.getConfiguration()).setQueueName(queue);


// number of indexes to partition into
job.setNumReduceTasks(numReduceTasks);
Scan scan = new Scan();
scan.setCacheBlocks(false);


// limit scan range
scan.setTimeRange(distartTime, endTime);
// scan.setMaxVersions(maxversions);
scan.setMaxVersions(1);


/* limit scan columns */
for (String column : columns) {
scan.addColumn(ColumnUtils.getFamily(column), ColumnUtils.getQualifier(column));
scan.addFamily(ColumnUtils.getFamily(column));
}


// set filter
if( bFilter ){
System.out.println("only export guangtaobao data. ");
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("producttype"),
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("guangtaobao")) );
filter.setFilterIfMissing(true);
scan.setFilter(filter);
}


TableMapReduceUtil.initTableMapperJob(tableName, scan, ExportHbaseMapper.class,
Text.class, Text.class, job);
// job.setReducerClass(ExportHbaseReducer.class);
FileOutputFormat.setOutputPath(job, new Path(indexDir));




return job;
}


/**
* Reads xml file of indexing configurations. The xml format is similar to
* hbase-default.xml and hadoop-default.xml. For an example configuration,
* see the <code>createIndexConfContent</code> method in TestTableIndex.
*
* @param fileName The file to read.
* @return XML configuration read from file.
* @throws IOException When the XML is broken.
*/
private static String readContent(String fileName) throws IOException {
File file = new File(fileName);
int length = (int) file.length();
if (length == 0) {
printUsage("Index configuration file " + fileName + " does not exist");
}


int bytesRead = 0;
byte[] bytes = new byte[length];
FileInputStream fis = new FileInputStream(file);


try {
// read entire file into content
while (bytesRead < length) {
int read = fis.read(bytes, bytesRead, length - bytesRead);
if (read > 0) {
bytesRead += read;
} else {
break;
}
}
} finally {
fis.close();
}


return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
}


/**
* The main entry point.
*
* @param args The command line arguments.
* @throws Exception When running the job fails.
*/
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) 0 : 1);
}


}


//////////////////////////////////////////////////////////


package com.hbase.mapreduce;


import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.lang.String;
import java.lang.StringBuffer;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;


import com.goodhope.utils.ColumnUtils;




/**
* Pass the given key and record as-is to the reduce phase.
*/
@SuppressWarnings("deprecation")
public class ExportHbaseMapper extends TableMapper<Text,Text> implements Configurable {
private static final Text keyTEXT = new Text();
private static final Text SENDTEXT = new Text();


private Configuration conf = null;


private long startTime = 0;
List<String> columnMap = null;


private long rCount = 0;
private long errCount = 0;
private int debug = 0;
private long thresCount = 10000;


public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {


rCount++;


String itemid = Bytes.toString(key.get());
if (itemid.contains("&")) {
context.getCounter("Error", "rowkey contains \"&\"").increment(1);
return;
}


StringBuffer outstr = new StringBuffer();
for (String col : columnMap) {


String tmp = Bytes.toString(value.getValue(ColumnUtils.getFamily(col), ColumnUtils.getQualifier(col)));
if (tmp == null){
context.getCounter("Error", col+" No value in hbase").increment(1);

errCount++;
if( debug > 0 && (errCount % thresCount == 0)){
System.err.println( itemid + ": doesn't has " + col + " data!");
}


outstr.append("NULL" + "\t");
}else{
if( tmp.contains("guangtaobao") ){
outstr.append("1" + "\t");
}else{
outstr.append(tmp.trim() + "\t");
}
}
}


if ( ! outstr.toString().isEmpty() ) {


SENDTEXT.set( outstr.toString() );
keyTEXT.set(itemid);
context.write(keyTEXT, SENDTEXT);


if( debug > 0 && (rCount % thresCount*10000 == 0)){
System.out.println( SENDTEXT.toString() + keyTEXT.toString() );
}
}
else
{
context.getCounter("Error", "No Colume output").increment(1);
return;
}
}


/**
* Returns the current configuration.
*
* @return The current configuration.
* @see org.apache.hadoop.conf.Configurable#getConf()
*/
@Override
public Configuration getConf() {
return conf;
}


/**
* Sets the configuration. This is used to set up the index configuration.
*
* @param configuration
* The configuration to set.
* @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)
*/
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;

startTime = Long.parseLong(conf.get("hbase.index.startTime"));
thresCount = Long.parseLong(conf.get("hbase.index.debugThreshold"));
debug = Integer.parseInt(conf.get("hbase.index.debug"));


String[] columns = conf.get("hbase.index.column").split("\\|");


columnMap = new ArrayList<String>();
for (String column : columns) {
System.out.println("Output column: " + column);


columnMap.add(column);
}


}


}




//////////////////////////////////////////////////////////


package com.hbase.utils;


import org.apache.hadoop.hbase.util.Bytes;


public class ColumnUtils {


public static byte[] getFamily(String column){
return getBytes(column, 0);
}


public static byte[] getQualifier(String column){
return getBytes(column, 1);
}


private static byte[] getBytes(String column , int offset){
String[] split = column.split(":");
return Bytes.toBytes(offset > split.length -1 split[0] :split[offset]);
}
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hbase教程2(cdh hue访问) 下一篇spark使用java读取hbase数据做分..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目