设为首页 加入收藏

TOP

HBase快速导入数据--BulkLoad(二)
2015-08-31 19:59:45 来源: 作者: 【 】 浏览:184
Tags:HBase 快速 导入 数据 --BulkLoad
在,就将其删除
? ? ? ? }
? ? ? ? FileOutputFormat.setOutputPath(job, output);//输出路径
? ? ? ? Connection connection = ConnectionFactory.createConnection(configuration);
? ? ? ? TableName tableName = TableName.valueOf(TABLE_NAME);
? ? ? ? HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
? ? ? ? job.waitForCompletion(true);
? ? ? ? if (job.isSuccessful()){
? ? ? ? ? ? HFileLoader.doBulkLoad(outputPath, TABLE_NAME);//导入数据
? ? ? ? ? ? return 0;
? ? ? ? } else {
? ? ? ? ? ? return 1;
? ? ? ? }
? ? }


}


BulkLoadMapper.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


/**
?* Created by shaobo on 15-6-9.
?*/
public class BulkLoadMapper extends Mapper {
? ? private String hbaseTable;
? ? private String dataSeperator;
? ? private String columnFamily1;
? ? private String columnFamily2;


? ? public void setup(Context context) {
? ? ? ? Configuration configuration = context.getConfiguration();//获取作业参数
? ? ? ? hbaseTable = configuration.get("hbase.table.name");
? ? ? ? dataSeperator = configuration.get("data.seperator");
? ? ? ? columnFamily1 = configuration.get("COLUMN_FAMILY_1");
? ? ? ? columnFamily2 = configuration.get("COLUMN_FAMILY_2");
? ? }


? ? public void map(LongWritable key, Text value, Context context){
? ? ? ? try {
? ? ? ? ? ? String[] values = value.toString().split(dataSeperator);
? ? ? ? ? ? ImmutableBytesWritable rowKey = new ImmutableBytesWritable(values[0].getBytes());
? ? ? ? ? ? Put put = new Put(Bytes.toBytes(values[0]));
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("month"), Bytes.toBytes(values[1]));
? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("day"), Bytes.toBytes(values[2]));
? ? ? ? ? ? for (int i = 3; i < values.length; ++i){
? ? ? ? ? ? ? ? put.addColumn(Bytes.toBytes(columnFamily2), Bytes.toBytes("hour : " + i), Bytes.toBytes(values[i]));
? ? ? ? ? ? }
? ? ? ? ? ? context.write(rowKey, put);
? ? ? ? } catch(Exception exception) {
? ? ? ? ? ? exception.printStackTrace();
? ? ? ? }


? ? }


}


HFileLoader.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;


/**
?* Created by shaobo on 15-6-9.
?*/
public class HFileLoader {
? ? public static void doBulkLoad(String pathToHFile, String tableName){
? ? ? ? try {
? ? ? ? ? ? Configuration configuration = new Configuration();
? ? ? ? ? ? HBaseConfiguration.addHbaseResources(configuration);
? ? ? ? ? ? LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
? ? ? ? ? ? HTable hTable = new HTable(configuration, tableName);//指定表名
? ? ? ? ? ? loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//导入数据
? ? ? ? ? ? System.out.println("Bulk Load Completed..");
? ? ? ? } catch(Exception exception) {
? ? ? ? ? ? exception.printStackTrace();
? ? ? ? }


? ? }


}


程序编译打包,提交到Hadoop运行


HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar BulkLoad.jar inputpath outputpath1


上述命令用法可参考 44. HBase, MapReduce, and the CLASSPATH


作业运行情况:


15/06/14 14:31:07 INFO mapreduce.HFileOutputFormat2: Looki

首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇MySQL5.6新特性之crash-safe 下一篇Hive本地模式安装及遇到的问题和..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: