设为首页 加入收藏

TOP

【HBase基础教程】7、HBase之读取HBase数据写入HDFS
2019-03-15 01:43:15 】 浏览:8
Tags:HBase 基础 教程 读取 数据 写入 HDFS

本blog介绍如何读取Hbase中的数据并写入到HDFS分布式文件系统中。读取数据比较简单,我们借用上一篇【HBase基础教程】6、HBase之读取MapReduce数据写入HBase的hbase数据输出wordcount表作为本篇数据源的输入,编写Mapper函数,读取wordcount表中的数据填充到< key,value>,通过Reduce函数直接输出得到的结果即可。

开发环境


硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点)
软件环境:Java 1.7.0_45、Eclipse Juno Service Release 2、hadoop-1.2.1、hbase-0.94.20。

1、 输入与输出


1)输入数据源:

上一篇【HBase基础教程】6、HBase之读取MapReduce数据写入HBase实现了读取MapReduce数据写入到Hbase表wordcount中,在本篇blog中,我们将wordcount表作为输入数据源。

2)输出目标:

HDFS分布式文件系统中的文件。

2、 Mapper函数实现


WordCountHbaseReaderMapper类继承了TableMapper< Text,Text>抽象类,TableMapper类专门用于完成MapReduce中Map过程与Hbase表之间的操作。此时的map(ImmutableBytesWritable key,Result value,Context context)方法,第一个参数key为Hbase表的rowkey主键,第二个参数value为key主键对应的记录集合,此处的map核心实现是遍历key主键对应的记录集合value,将其组合成一条记录通过contentx.write(key,value)填充到< key,value>键值对中。
详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java

public static class WordCountHbaseReaderMapper extends 
    TableMapper<Text,Text>{

    @Override
    protected void map(ImmutableBytesWritable key,Result value,Context context)
            throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer("");
        for(Entry<byte[],byte[]> entry:value.getFamilyMap("content".getBytes()).entrySet()){
            String str =  new String(entry.getValue());
            //将字节数组转换为String类型
            if(str != null){
                sb.append(new String(entry.getKey()));
                sb.append(":");
                sb.append(str);
            }
            context.write(new Text(key.get()), new Text(new String(sb)));
        }
    }
}

3、 Reducer函数实现


此处的WordCountHbaseReaderReduce实现了直接输出Map输出的< key,value>键值对,没有对其做任何处理。详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java

public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,Text>{
    private Text result = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context)
            throws IOException, InterruptedException {
        for(Text val:values){
            result.set(val);
            context.write(key, result);
        }
    }
}

4、 驱动函数实现


与WordCount的驱动类不同,在Job配置的时候没有配置job.setMapperClass(),而是用以下方法执行Mapper类: TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
该方法指明了在执行job的Map过程时,数据输入源是hbase的tablename表,通过扫描读入对象scan对表进行全表扫描,为Map过程提供数据源输入,通过WordCountHbaseReaderMapper.class执行Map过程,Map过程的输出key/value类型是 Text.class与Text.class,最后一个参数是作业对象。特别注意:这里声明的是一个最简单的扫描读入对象scan,进行表扫描读取数据,其中scan可以配置参数,这里为了例子简单不再详述,用户可自行尝试。
详细源码请参考:WordCountHbaseReader\src\com\zonesion\hbase\WordCountHbaseReader.java

public static void main(String[] args) throws Exception {
    String tablename = "wordcount";
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", "Master");
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 1) {
      System.err.println("Usage: WordCountHbaseReader <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "WordCountHbaseReader");
    job.setJarByClass(WordCountHbaseReader.class);
    //设置任务数据的输出路径;
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
    job.setReducerClass(WordCountHbaseReaderReduce.class);
    Scan scan = new Scan();
    TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
    //调用job.waitForCompletion(true) 执行任务,执行成功后退出;
    System.exit(job.waitForCompletion(true)  0 : 1);


}

5、部署运行


1)启动Hadoop集群和Hbase服务

[hadoop@K-Master ~]$ start-dfs.sh     #启动hadoop HDFS文件管理系统
[hadoop@K-Master ~]$ start-mapred.sh      #启动hadoop MapReduce分布式计算服务
[hadoop@K-Master ~]$ start-hbase.sh       #启动Hbase
[hadoop@K-Master ~]$ jps              #查看进程
22003 HMaster
10611 SecondaryNameNode
22226 Jps
21938 HQuorumPeer
10709 JobTracker
22154 HRegionServer
20277 Main
10432 NameNode

2)部署源码

#设置工作环境
[hadoop@K-Master ~]$ mkdir -p /usr/hadoop/workspace/Hbase
#部署源码
将WordCountHbaseReader文件夹拷贝到/usr/hadoop/workspace/Hbase/ 路径下;

… 你可以直接 下载 WordCountHbaseReader

3)修改配置文件

a)查看hbase核心配置文件hbase-site.xml的hbase.zookeeper.quorum属性

参考“【HBase基础教程】5、HBase API访问 3、部署运行 3)修改配置文件”查看hbase核心配置文件hbase-site.xml的hbase.zookeeper.quorum属性;

b)修改项目WordCountHbaseWriter/src/config.properties属性文件

将项目WordCountHbaseWriter/src/config.properties属性文件的hbase.zookeeper.quorum属性值修改为上一步查询到的属性值,保持config.properties文件的hbase.zookeeper.quorum属性值与hbase-site.xml文件的hbase.zookeeper.quorum属性值一致;

#切换工作目录
[hadoop@K-Master ~]$ cd /usr/hadoop/workspace/Hbase/ WordCountHbaseReader
#修改属性值
[hadoop@K-Master WordCountHbaseReader]$ vim src/config.properties
hbase.zookeeper.quorum=K-Master
#拷贝src/config.properties文件到bin/文件夹
[hadoop@K-Master WordCountHbaseReader]$ cp src/config.properties bin/

4)编译文件

#切换工作目录
[hadoop@K-Master ~]$ cd /usr/hadoop/workspace/Hbase/WordCountHbaseReader
#执行编译
[hadoop@K-Master WordCountHbaseReader]$ javac -classpath /usr/hadoop/hadoop-core-1.2.1.jar:/usr/hadoop/lib/commons-cli-1.2.jar:lib/zookeeper-3.4.5.jar:lib/hbase-0.94.20.jar -d bin/ src/com/zonesion/hbase/WordCountHbaseReader.java
#查看编译文件
[hadoop@K-Master WordCountHbaseReader]$ ls bin/com/zonesion/hbase/ -la
total 20
drwxrwxr-x 2 hadoop hadoop 4096 Dec 29 10:36 .
drwxrwxr-x 3 hadoop hadoop 4096 Dec 29 10:36 ..
-rw-rw-r-- 1 hadoop hadoop 2166 Dec 29 14:31 WordCountHbaseReader.class
-rw-rw-r-- 1 hadoop hadoop 2460 Dec 29 14:31 WordCountHbaseReader$WordCountHbaseReaderMapper.class
-rw-rw-r-- 1 hadoop hadoop 1738 Dec 29 14:31 WordCountHbaseReader$WordCountHbaseReaderReduce.class

5)打包Jar文件

#拷贝lib文件夹到bin文件夹
[hadoop@K-Master WordCountHbaseReader]$ cp -r lib/ bin/
#打包Jar文件
[hadoop@K-Master WordCountHbaseReader]$ jar -cvf WordCountHbaseReader.jar -C bin/ .
added manifest
adding: lib/(in = 0) (out= 0)(stored 0%)
adding: lib/zookeeper-3.4.5.jar(in = 779974) (out= 721150)(deflated 7%)
adding: lib/guava-11.0.2.jar(in = 1648200) (out= 1465342)(deflated 11%)
adding: lib/protobuf-java-2.4.0a.jar(in = 449818) (out= 420864)(deflated 6%)
adding: lib/hbase-0.94.20.jar(in = 5475284) (out= 5038635)(deflated 7%)
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/hbase/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/hbase/PropertiesHelper.class(in = 4480) (out= 1926)(deflated 57%)
adding: com/zonesion/hbase/WordCountHbaseReader.class(in = 2702) (out= 1226)(deflated 54%)
adding: com/zonesion/hbase/WordCountHbaseReader$WordCountHbaseReaderMapper.class(in = 3250) (out= 1275)(deflated 60%)
adding: com/zonesion/hbase/WordCountHbaseReader$WordCountHbaseReaderReduce.class(in = 2308) (out= 872)(deflated 62%)
adding: config.properties(in = 32) (out= 34)(deflated -6%)

6)运行实例

[hadoop@K-Master WordCountHbase]$ hadoop jar WordCountHbaseReader.jar WordCountHbaseReader /user/hadoop/WordCountHbaseReader/output/
...................省略.............
14/12/30 17:51:58 INFO mapred.JobClient: Running job: job_201412161748_0035
14/12/30 17:51:59 INFO mapred.JobClient:  map 0% reduce 0%
14/12/30 17:52:13 INFO mapred.JobClient:  map 100% reduce 0%
14/12/30 17:52:26 INFO mapred.JobClient:  map 100% reduce 100%
14/12/30 17:52:27 INFO mapred.JobClient: Job complete: job_201412161748_0035
14/12/30 17:52:27 INFO mapred.JobClient: Counters: 39
14/12/30 17:52:27 INFO mapred.JobClient:   Job Counters
14/12/30 17:52:27 INFO mapred.JobClient:     Launched reduce tasks=1
14/12/30 17:52:27 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=4913
14/12/30 17:52:27 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/12/30 17:52:27 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/12/30 17:52:27 INFO mapred.JobClient:     Rack-local map tasks=1
14/12/30 17:52:27 INFO mapred.JobClient:     Launched map tasks=1
14/12/30 17:52:27 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13035
14/12/30 17:52:27 INFO mapred.JobClient:   HBase Counters
14/12/30 17:52:27 INFO mapred.JobClient:     REMOTE_RPC_CALLS=8
14/12/30 17:52:27 INFO mapred.JobClient:     RPC_CALLS=8
14/12/30 17:52:27 INFO mapred.JobClient:     RPC_RETRIES=0
14/12/30 17:52:27 INFO mapred.JobClient:     NOT_SERVING_REGION_EXCEPTION=0
14/12/30 17:52:27 INFO mapred.JobClient:     NUM_SCANNER_RESTARTS=0
14/12/30 17:52:27 INFO mapred.JobClient:     MILLIS_BETWEEN_NEXTS=9
14/12/30 17:52:27 INFO mapred.JobClient:     BYTES_IN_RESULTS=216
14/12/30 17:52:27 INFO mapred.JobClient:     BYTES_IN_REMOTE_RESULTS=216
14/12/30 17:52:27 INFO mapred.JobClient:     REGIONS_SCANNED=1
14/12/30 17:52:27 INFO mapred.JobClient:     REMOTE_RPC_RETRIES=0
14/12/30 17:52:27 INFO mapred.JobClient:   File Output Format Counters
14/12/30 17:52:27 INFO mapred.JobClient:     Bytes Written=76
14/12/30 17:52:27 INFO mapred.JobClient:   FileSystemCounters
14/12/30 17:52:27 INFO mapred.JobClient:     FILE_BYTES_READ=92
14/12/30 17:52:27 INFO mapred.JobClient:     HDFS_BYTES_READ=68
14/12/30 17:52:27 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=159978
14/12/30 17:52:27 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=76
14/12/30 17:52:27 INFO mapred.JobClient:   File Input Format Counters
14/12/30 17:52:27 INFO mapred.JobClient:     Bytes Read=0
14/12/30 17:52:27 INFO mapred.JobClient:   Map-Reduce Framework
14/12/30 17:52:27 INFO mapred.JobClient:     Map output materialized bytes=92
14/12/30 17:52:27 INFO mapred.JobClient:     Map input records=5
14/12/30 17:52:27 INFO mapred.JobClient:     Reduce shuffle bytes=92
14/12/30 17:52:27 INFO mapred.JobClient:     Spilled Records=10
14/12/30 17:52:27 INFO mapred.JobClient:     Map output bytes=76
14/12/30 17:52:27 INFO mapred.JobClient:     Total committed heap usage (bytes)=211025920
14/12/30 17:52:27 INFO mapred.JobClient:     CPU time spent (ms)=2160
14/12/30 17:52:27 INFO mapred.JobClient:     Combine input records=0
14/12/30 17:52:27 INFO mapred.JobClient:     SPLIT_RAW_BYTES=68
14/12/30 17:52:27 INFO mapred.JobClient:     Reduce input records=5
14/12/30 17:52:27 INFO mapred.JobClient:     Reduce input groups=5
14/12/30 17:52:27 INFO mapred.JobClient:     Combine output records=0
14/12/30 17:52:27 INFO mapred.JobClient:     Physical memory (bytes) snapshot=263798784
14/12/30 17:52:27 INFO mapred.JobClient:     Reduce output records=5
14/12/30 17:52:27 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1491795968
14/12/30 17:52:27 INFO mapred.JobClient:     Map output records=5

7)查看运行结果

[hadoop@K-Master WordCountHbaseReader]$ hadoop fs  -ls /user/hadoop/WordCountHbaseReader/output/
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2014-07-28 18:04 /user/hadoop/WordCountHbaseReader/output/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2014-07-28 18:04 /user/hadoop/WordCountHbaseReader/output/_logs
-rw-r--r--   1 hadoop supergroup         76 2014-07-28 18:04 /user/hadoop/WordCountHbaseReader/output/part-r-00000
[hadoop@K-Master WordCountHbaseReader]$ hadoop fs -cat /user/hadoop/WordCountHbaseReader/output/part-r-00000
Bye count:1
Goodbye count:1
Hadoope count:2
Hellope count:2
Worldpe count:2

您可能喜欢

【HBase基础教程】1、HBase之单机模式与伪分布式模式安装
【HBase基础教程】2、HBase之完全分布式模式安装
【HBase基础教程】3、HBase Shell DDL操作
【HBase基础教程】4、HBase Shell DML操作
【HBase基础教程】5、HBase API访问
【HBase基础教程】6、HBase之读取MapReduce数据写入HBase
【HBase基础教程】7、HBase之读取HBase数据写入HDFS


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇sparkstreaming实时写入Hbase(sav.. 下一篇HBase启动HMaster几秒后又死掉的..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }