设为首页 加入收藏

TOP

批量插入Hbase数据
2019-04-28 13:43:43 】 浏览:45
Tags:批量 插入 Hbase 数据
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/jin6872115/article/details/86097882

批量往Hbase插入数据,需要知道表名,列名,列簇等字段即可。指定Hbase链接

package com.cslc.asiancup.dfstohbase;

import com.cslc.asiancup.utils.HbaseUtilJava;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @Author :LPJ
 * @Date 2018/12/22 11:04
 */
public class CityInfo extends Configured implements Tool {


    public static void main(String[] args) throws Exception {
        System.out.println("running..............");
        //指定类名
        int run = ToolRunner.run(new CityInfo(), args);
        System.out.println("end..............");
        System.exit(run);

    }


    public int run(String[] arg0) throws Exception {

        Configuration conf = HbaseUtilJava.conf;
        FileSystem fs = FileSystem.get(conf);
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        Job job = Job.getInstance(conf);
        job.setJarByClass(CityInfo.class); //指定类名
        job.setMapperClass(CityInfo.HDFSToHbaseMapper.class);//指定map
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        //指定表名和reduce
        TableMapReduceUtil.initTableReducerJob("CITY_INFO", CityInfo.HDFSToHbaseReducer.class, job,null,null,null,null,false);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Put.class);
        //指定路径
        Path inputPath = new Path("/cslc/lpj/ascup/city");
//        Path inputPath = new Path("/home/lpj/city");
        FileInputFormat.addInputPath(job, inputPath);
        boolean isDone = job.waitForCompletion(true);
        return isDone  0 : 1;
    }
    public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }
    public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values,Context context)
                throws IOException, InterruptedException {

            String[] split = key.toString().split("\t");
            //rowkey
            Put put = new Put((split[0]+"-"+split[2]).getBytes());
            //"APP_NUM","APP_AMOUNT"
            put.addColumn("CF1".getBytes(), "PROVINCE_NO".getBytes(), split[0].getBytes());
            put.addColumn("CF1".getBytes(), "PROVINCE_NAME".getBytes(), split[1].getBytes());
            put.addColumn("CF1".getBytes(), "CITY_NO".getBytes(), split[2].getBytes());
            put.addColumn("CF1".getBytes(), "CITY_NAME".getBytes(), split[3].getBytes());
            context.write(NullWritable.get(), put);
        }
    }


}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hbase   snapshot 下一篇HBASE shell 常用命令大全

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目