设为首页 加入收藏

TOP

从hbase到hbase数据传输
2019-04-24 01:46:52 】 浏览:61
Tags:hbase 数据传输

1.map

package mr.hdfstoHbase.HbaseTOHbase;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;

public class HbasetoHbaseMapper extends TableMapper<Text, MyTableWritable> {
    private Text mapKey = new Text();
    private MyTableWritable mapValue = new MyTableWritable();

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        String str = Bytes.toString(key.get());
        mapKey.set(str);

        List<Cell> cells = value.listCells();

        HashMap<String, String> cellMap = new HashMap<>();
        for (Cell tmp : cells
        ) {
            String family = Bytes.toString(tmp.getFamilyArray(), tmp.getFamilyOffset(), tmp.getFamilyLength());
            String qKey = Bytes.toString(tmp.getQualifierArray(), tmp.getQualifierOffset(), tmp.getQualifierLength());
            String values = Bytes.toString(tmp.getValueArray(), tmp.getValueOffset(), tmp.getValueLength());
            if (family.equals("info")) {
                cellMap.put(qKey, values);
            }
        }
        mapValue.setId(Integer.valueOf(str));
        mapValue.setName(cellMap.get("name"));
        mapValue.setAge(Integer.valueOf(cellMap.get("age")));
        context.write(mapKey, mapValue);
    }
}

2.reduce

package mr.hdfstoHbase.HbaseTOHbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * 从hbase写入hbase中
 * 例子:hadoop:student 到 hadoop:mystu
 * <p>
 * public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
 */
public class HbaseToHbaseReducer extends TableReducer<Text, MyTableWritable, NullWritable> {


    @Override
    protected void reduce(Text key, Iterable<MyTableWritable> values, Context context) throws IOException, InterruptedException {

        for (MyTableWritable tmp : values) {
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("id"),
                    Bytes.toBytes(tmp.getId()));
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),
                    Bytes.toBytes(tmp.getName()));
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),
                    Bytes.toBytes(tmp.getAge()));
            context.write(NullWritable.get(), put);
        }
    }
}

3.runner

package mr.hdfstoHbase.HbaseTOHbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;

public class HbaseToHbaseRunner {

    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0");

        Configuration conf = new Configuration();

        //hdfs入口
        conf.set("fs.defaultFS", "hdfs://wang:9000");
        conf.set("zookeeper.znode.parent", "/hbase");
        conf.set("hbase.zookeeper.quorum", "wang");
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        Job job = Job.getInstance(conf);
        job.setJobName("HbaseToHbaseRunnerJob");
        job.setJarByClass(HbaseToHbaseRunner.class);


        //设置输入类型,以及hbase初始化
       /* public static void initTableMapperJob(String table, Scan scan,
                Class< extends TableMapper> mapper,
                Class<> outputKeyClass,
                Class<> outputValueClass, Job job)*/
        TableMapReduceUtil.initTableMapperJob("hadoop:student",//hbase表的名称
                new Scan(),//全表扫描
                HbasetoHbaseMapper.class,
                Text.class,
                MyTableWritable.class,
                job);

     /*   public static void initTableReducerJob(String table,
                Class< extends TableReducer> reducer, Job job)*/
        TableMapReduceUtil.initTableReducerJob(
                "hadoop:mystu",
                HbaseToHbaseReducer.class,
                job );
        //执行任务
        job.waitForCompletion(true);

    }
}

4.自定义类

package mr.hdfstoHbase.HbaseTOHbase;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MyTableWritable implements Writable {
    //private  String id=null;
    private int id = 0;
    private String name = null;
    private int age = 0;

    @Override
    public void write(DataOutput out) throws IOException {
        //  out.writeUTF(id);
        out.writeInt(id);
        out.writeUTF(name);
        out.writeInt(age);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        //   id=in.readUTF();
        id = in.readInt();
        name = in.readUTF();
        age = in.readInt();
    }

/*
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }*/

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "MyTableWritable{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase表设计原则整理 下一篇shell脚本操作hbase的两种命令

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目