1.map
package mr.hdfstoHbase.HbaseTOHbase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
public class HbasetoHbaseMapper extends TableMapper<Text, MyTableWritable> {
private Text mapKey = new Text();
private MyTableWritable mapValue = new MyTableWritable();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String str = Bytes.toString(key.get());
mapKey.set(str);
List<Cell> cells = value.listCells();
HashMap<String, String> cellMap = new HashMap<>();
for (Cell tmp : cells
) {
String family = Bytes.toString(tmp.getFamilyArray(), tmp.getFamilyOffset(), tmp.getFamilyLength());
String qKey = Bytes.toString(tmp.getQualifierArray(), tmp.getQualifierOffset(), tmp.getQualifierLength());
String values = Bytes.toString(tmp.getValueArray(), tmp.getValueOffset(), tmp.getValueLength());
if (family.equals("info")) {
cellMap.put(qKey, values);
}
}
mapValue.setId(Integer.valueOf(str));
mapValue.setName(cellMap.get("name"));
mapValue.setAge(Integer.valueOf(cellMap.get("age")));
context.write(mapKey, mapValue);
}
}
2.reduce
package mr.hdfstoHbase.HbaseTOHbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* 从hbase写入hbase中
* 例子:hadoop:student 到 hadoop:mystu
* <p>
* public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
*/
public class HbaseToHbaseReducer extends TableReducer<Text, MyTableWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<MyTableWritable> values, Context context) throws IOException, InterruptedException {
for (MyTableWritable tmp : values) {
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("id"),
Bytes.toBytes(tmp.getId()));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),
Bytes.toBytes(tmp.getName()));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),
Bytes.toBytes(tmp.getAge()));
context.write(NullWritable.get(), put);
}
}
}
3.runner
package mr.hdfstoHbase.HbaseTOHbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class HbaseToHbaseRunner {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0");
Configuration conf = new Configuration();
//hdfs入口
conf.set("fs.defaultFS", "hdfs://wang:9000");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set("hbase.zookeeper.quorum", "wang");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job = Job.getInstance(conf);
job.setJobName("HbaseToHbaseRunnerJob");
job.setJarByClass(HbaseToHbaseRunner.class);
//设置输入类型,以及hbase初始化
/* public static void initTableMapperJob(String table, Scan scan,
Class< extends TableMapper> mapper,
Class<> outputKeyClass,
Class<> outputValueClass, Job job)*/
TableMapReduceUtil.initTableMapperJob("hadoop:student",//hbase表的名称
new Scan(),//全表扫描
HbasetoHbaseMapper.class,
Text.class,
MyTableWritable.class,
job);
/* public static void initTableReducerJob(String table,
Class< extends TableReducer> reducer, Job job)*/
TableMapReduceUtil.initTableReducerJob(
"hadoop:mystu",
HbaseToHbaseReducer.class,
job );
//执行任务
job.waitForCompletion(true);
}
}
4.自定义类
package mr.hdfstoHbase.HbaseTOHbase;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MyTableWritable implements Writable {
//private String id=null;
private int id = 0;
private String name = null;
private int age = 0;
@Override
public void write(DataOutput out) throws IOException {
// out.writeUTF(id);
out.writeInt(id);
out.writeUTF(name);
out.writeInt(age);
}
@Override
public void readFields(DataInput in) throws IOException {
// id=in.readUTF();
id = in.readInt();
name = in.readUTF();
age = in.readInt();
}
/*
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}*/
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "MyTableWritable{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}