|
import yeepay.util.HdfsUtil;
import yeepay.util.YeepayConstant;
import java.util.Date;
public abstract class AbstractJobBulkLoad {
public static Configuration conf = HBaseConfiguration.create();
public void run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("please set input dir");
System.exit(-1);
return;
}
String txtPath = args[0];
String tableName = args[1];
Job job = new Job(conf, "txt2HBase");
HTable htable = null;
try {
htable = new HTable(conf, tableName); //set table name
// 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围
HFileOutputFormat.configureIncrementalLoad(job, htable);
htable.close();
job.setJarByClass(AbstractJobBulkLoad.class);
FileSystem fs = FileSystem.get(conf);
System.out.println("input file :" + txtPath);
Path inputFile = new Path(txtPath);
if (!fs.exists(inputFile)) {
System.err.println("inputFile " + txtPath + " not exist.");
throw new RuntimeException("inputFile " + txtPath + " not exist.");
}
FileInputFormat.addInputPath(job, inputFile);
//
job.setMapperClass(getMapperClass());
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
//
job.setReducerClass(getReducerClass());
Date now = new Date();
Path output = new Path("/output/" + tableName + "/" + now.getTime());
System.out.println("/output/" + tableName + "/" + now.getTime());
FileOutputFormat.setOutputPath(job, output);
job.waitForCompletion(true);
//执行BulkLoad
HdfsUtil.chmod(conf, output.toString());
HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY);
htable = new HTable(conf, tableName);
new LoadIncrementalHFiles(conf).doBulkLoad(output, htable);
htable.close();
System.out.println("HFile data load success!");
System.out.println(getJobName() + " end!");
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
protected abstract Class getMapperClass();
protected abstract Class getReducerClass();
protected abstract String getTableName();
protected abstract String getJobName();
}
|