设为首页 加入收藏

TOP

MR方式HDFS数据导入Hbase表数据
2019-04-23 13:46:18 】 浏览:52
Tags:方式 HDFS 数据 导入 Hbase

MR方式HDFS数据导入Hbase表数据,通过ImmutableBytesWritable方式

原始数据是基站码,不方便贴出来,大家见谅。

eg:

代码:

package com.test.transform;

import java.io.IOException;
import java.text.ParseException;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
* @ClassName: HdfsToHbase
* @Description: hdfs数据导入hbase
* @author hello
* @date 2018年2月27日 上午9:24:06
*
*/
public class HdfsToHbase {

public static class MapperClass extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().replaceAll(" ", "").split("\t");
String rowkey = null;
try {
rowkey = getRowkeyByUUId(lines[1], lines[2], lines[3]);//rowkeys生成
} catch (ParseException e) {
e.printStackTrace();
}
Put put = new Put(rowkey.getBytes());
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("mcc"), Bytes.toBytes(lines[0]));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("mnc"), Bytes.toBytes(lines[1]));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("lac"), Bytes.toBytes(lines[2]));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("ci"), Bytes.toBytes(lines[3]));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("lat"), Bytes.toBytes(lines[4]));//经度
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("lon"), Bytes.toBytes(lines[5]));//维度
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("acc"), Bytes.toBytes(lines[6]));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("date"), Bytes.toBytes(lines[7]));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("validity"), Bytes.toBytes(lines[8]));
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("addr"),Bytes.toBytes((lines[9]));//地址
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("province"),Bytes.toBytes(lines[10]));//省
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("city"),Bytes.toBytes(lines[11]));//市
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("district"),Bytes.toBytes(lines[12]));//区
put.addColumn(Bytes.toBytes("ColumnFamily1"), Bytes.toBytes("township"),Bytes.toBytes(lines[13]));//镇
context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
}
}

public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException, ParseException {
long start = System.currentTimeMillis();// 任务开始时间
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "IP地址");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
Job job = Job.getInstance(conf, "import-hbase");
job.setJarByClass(HdfsToHbase.class);
job.setMapperClass(MapperClass.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "cellinfo_test");
FileInputFormat.addInputPath(job, new Path("hdfs://ip地址/cell_text.txt"));
System.out.println(job.waitForCompletion(true) 0 : 1);
long end = System.currentTimeMillis();// 任务结束时间
System.out.println("用时 " + ((end - start) / 1000) + " s");
}
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Solr+Hbase+Hbase Indexer查询方.. 下一篇【四】HBase Shell命令

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目