设为首页 加入收藏

TOP

从HBASE中读取数据,MR进行分析处理输出
2019-04-14 13:45:04 】 浏览:49
Tags:HBASE 读取 数据 进行 分析 处理 输出

从HBASE中读取数据,继承的是TableMapper,不是Mapper,map端主要是从HBASE上获取数据,然后进行输出,输出类型自己可以随意定义。
代码如下:

package HbaseMR;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;

//map端输出两个参数,不是输入
public class MyMap extends TableMapper<Text, HbaseBean2> {

    // HbaseBean hb = new HbaseBean();
    HbaseBean2 v = new HbaseBean2();
    Text k1 = new Text();

    @Override
    //key是rowkey value是rowcell,前两个是输入参数,context是输出
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {

        //遍历添加 column 行
        // value.getColumn(value.get, Bytes.toBytes("userPhoneNum"));
        //定义一个缓冲数组,用于组成key在reduce端可以进行切割处理
        StringBuilder sb = new StringBuilder();
        for (Cell cell : value.rawCells()) {

            // 添 加 / 克 隆 列 族 :info
            if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                // 添 加 / 克 隆 列 :userPhoneNum
                if ("userPhoneNum".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {

                    sb.append(Bytes.toString(CellUtil.cloneva lue(cell)));
                    // 添 加 / 克 隆 列 :date
                    // hb.setTimes(0);
                } else if ("date".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {

                    String r = Bytes.toString(CellUtil.cloneva lue(cell));
                    String re = r.substring(0, 7).replace("-", "");
                    sb.append(re + "-");
                    // 添 加 / 克 隆 列 :costtime
                } else if ("costtime".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                    //向costtime传值
                    v.setCosttime(Bytes.toString(CellUtil.cloneva lue(cell)));
                    //用于累加次数
                    v.setTimes(1);
                }

            }

        }
        //System.err.println(sb.toString());
        //key的设置
        k1.set(sb.toString());
        //进行输出
        context.write(k1, v);

    }

}

reduce端主要进行业务逻辑的处理,把值累加,
代码如下:

package HbaseMR;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MyReduce extends Reducer<Text, HbaseBean2, Text, HbaseBean2> {

    @Override
    protected void reduce(Text key, Iterable<HbaseBean2> values, Context context) throws IOException, InterruptedException {
        long sum = 0;
        int ti = 0;
        for (HbaseBean2 hb :
                values) {
            //每个手机号的通话时长进行累加
            sum += Integer.parseInt(hb.getCosttime());
            //次数进行累加
            ti += hb.getTimes();
        }
        //重新输出key
        String[] split = key.toString().split("-");
        String res = split[0] + "\t" + split[1];
        Text t = new Text();
        t.set(res);
        //把两个累加的值赋值给bean
        HbaseBean2 s = new HbaseBean2(String.valueOf(sum), ti);

        //进行输出
        context.write(t, s);
    }
}

driver端不同点舌map的设置:

package HbaseMR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;


public class MyDriver {
    private static Configuration conf = new Configuration();
    private static Connection conn;

    static {
        conf = HBaseConfiguration.create();
        //连接zookeeper
        conf.set("hbase.zookeeper.quorum",
                "192.168.146.128,192.168.146.129,192.168.146.130");
        //设置端口号
        conf.set("hbase.master.port", "60000");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) throws Exception {
        // 1、创建Configuration
        Job job = Job.getInstance(conf);

        // 2、设置jar加载路径
        job.setJarByClass(MyDriver.class);

        //配置 Job
        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        scan.setCaching(500);

        //加载map和reduce
        job.setMapperClass(MyMap.class);
        job.setReducerClass(MyReduce.class);
        //设置map的输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(HbaseBean2.class);

        //设置reduce的输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(HbaseBean2.class);

        //设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者是老
        //job.setNumReduceTasks(0);
        TableMapReduceUtil.initTableMapperJob(
                "test", //数据源的表名
                scan, //scan 扫描控制器
                MyMap.class,//设置 Mapper 类
                Text.class,//设置 Mapper 输出 key 类型
                HbaseBean2.class,//设置 Mapper 输出 value 值类型
                job);//设置给哪个 JOB
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[0]));
        // 提交任务
        boolean result = job.waitForCompletion(true);
        System.out.println(result);
        //System.exit(status);
    }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇iServer集成HBase操作说明 下一篇启动hbase后没有hmaster进程

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目