版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/jin6872115/article/details/86097882
批量往Hbase插入数据,需要知道表名,列名,列簇等字段即可。指定Hbase链接
package com.cslc.asiancup.dfstohbase;
import com.cslc.asiancup.utils.HbaseUtilJava;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @Author :LPJ
* @Date 2018/12/22 11:04
*/
public class CityInfo extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.out.println("running..............");
//指定类名
int run = ToolRunner.run(new CityInfo(), args);
System.out.println("end..............");
System.exit(run);
}
public int run(String[] arg0) throws Exception {
Configuration conf = HbaseUtilJava.conf;
FileSystem fs = FileSystem.get(conf);
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
Job job = Job.getInstance(conf);
job.setJarByClass(CityInfo.class); //指定类名
job.setMapperClass(CityInfo.HDFSToHbaseMapper.class);//指定map
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//指定表名和reduce
TableMapReduceUtil.initTableReducerJob("CITY_INFO", CityInfo.HDFSToHbaseReducer.class, job,null,null,null,null,false);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
//指定路径
Path inputPath = new Path("/cslc/lpj/ascup/city");
// Path inputPath = new Path("/home/lpj/city");
FileInputFormat.addInputPath(job, inputPath);
boolean isDone = job.waitForCompletion(true);
return isDone 0 : 1;
}
public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
String[] split = key.toString().split("\t");
//rowkey
Put put = new Put((split[0]+"-"+split[2]).getBytes());
//"APP_NUM","APP_AMOUNT"
put.addColumn("CF1".getBytes(), "PROVINCE_NO".getBytes(), split[0].getBytes());
put.addColumn("CF1".getBytes(), "PROVINCE_NAME".getBytes(), split[1].getBytes());
put.addColumn("CF1".getBytes(), "CITY_NO".getBytes(), split[2].getBytes());
put.addColumn("CF1".getBytes(), "CITY_NAME".getBytes(), split[3].getBytes());
context.write(NullWritable.get(), put);
}
}
}