设为首页 加入收藏

TOP

HBase快速导入数据--BulkLoad(一)
2015-08-31 19:59:45 来源: 作者: 【 】 浏览:185
Tags:HBase 快速 导入 数据 --BulkLoad

Apache HBase是一个分布式的、面向列的开源数据库,它可以让我们随机的、实时的访问大数据。但是怎样有效的将数据导入到HBase呢?HBase有多种导入数据的方法,最直接的方法就是在MapReduce作业中使用TableOutputFormat作为输出,或者使用标准的客户端API,但是这些都不是非常有效的方法。


Bulkload利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。


Bulkload过程主要包括三部分:


1.从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS


这一步不在HBase的考虑范围内,不管数据源是什么,只要在进行下一步之前将数据上传到HDFS即可。


2.利用一个MapReduce作业准备数据


3.告诉RegionServers数据的位置并导入数据


这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。


下图简单明确的说明了整个过程



图片来自How-to: Use HBase Bulk Loading, and Why


Note:在进行BulkLoad之前,要在HBase中创建与程序中同名且结构相同的空表


Java实现如下:


BulkLoadDriver.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
?* Created by shaobo on 15-6-9.
?*/
public class BulkLoadDriver extends Configured implements Tool {
? ? private static final String DATA_SEPERATOR = "\\s+";
? ? private static final String TABLE_NAME = "temperature";//表名
? ? private static final String COLUMN_FAMILY_1="date";//列组1
? ? private static final String COLUMN_FAMILY_2="tempPerHour";//列组2


? ? public static void main(String[] args) {
? ? ? ? try {
? ? ? ? ? ? int response = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args);
? ? ? ? ? ? if(response == 0) {
? ? ? ? ? ? ? ? System.out.println("Job is successfully completed...");
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? System.out.println("Job failed...");
? ? ? ? ? ? }
? ? ? ? } catch(Exception exception) {
? ? ? ? ? ? exception.printStackTrace();
? ? ? ? }
? ? }


? ? public int run(String[] args) throws Exception {
? ? ? ? String outputPath = args[1];
? ? ? ? /**
? ? ? ? * 设置作业参数
? ? ? ? */
? ? ? ? Configuration configuration = getConf();
? ? ? ? configuration.set("data.seperator", DATA_SEPERATOR);
? ? ? ? configuration.set("hbase.table.name", TABLE_NAME);
? ? ? ? configuration.set("COLUMN_FAMILY_1", COLUMN_FAMILY_1);
? ? ? ? configuration.set("COLUMN_FAMILY_2", COLUMN_FAMILY_2);
? ? ? ? Job job = Job.getInstance(configuration, "Bulk Loading HBase Table::" + TABLE_NAME);
? ? ? ? job.setJarByClass(BulkLoadDriver.class);
? ? ? ? job.setInputFormatClass(TextInputFormat.class);
? ? ? ? job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类
? ? ? ? job.setMapOutputValueClass(Put.class);//指定输出值类
? ? ? ? job.setMapperClass(BulkLoadMapper.class);//指定Map函数
? ? ? ? FileInputFormat.addInputPaths(job, args[0]);//输入路径
? ? ? ? FileSystem fs = FileSystem.get(configuration);
? ? ? ? Path output = new Path(outputPath);
? ? ? ? if (fs.exists(output)) {
? ? ? ? ? ? fs.delete(output, true);//如果输出路径存

首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇MySQL5.6新特性之crash-safe 下一篇Hive本地模式安装及遇到的问题和..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: