从HBASE中读取数据,继承的是TableMapper,不是Mapper,map端主要是从HBASE上获取数据,然后进行输出,输出类型自己可以随意定义。
代码如下:
package HbaseMR;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
//map端输出两个参数,不是输入
public class MyMap extends TableMapper<Text, HbaseBean2> {
// HbaseBean hb = new HbaseBean();
HbaseBean2 v = new HbaseBean2();
Text k1 = new Text();
@Override
//key是rowkey value是rowcell,前两个是输入参数,context是输出
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//遍历添加 column 行
// value.getColumn(value.get, Bytes.toBytes("userPhoneNum"));
//定义一个缓冲数组,用于组成key在reduce端可以进行切割处理
StringBuilder sb = new StringBuilder();
for (Cell cell : value.rawCells()) {
// 添 加 / 克 隆 列 族 :info
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// 添 加 / 克 隆 列 :userPhoneNum
if ("userPhoneNum".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
sb.append(Bytes.toString(CellUtil.cloneva lue(cell)));
// 添 加 / 克 隆 列 :date
// hb.setTimes(0);
} else if ("date".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
String r = Bytes.toString(CellUtil.cloneva lue(cell));
String re = r.substring(0, 7).replace("-", "");
sb.append(re + "-");
// 添 加 / 克 隆 列 :costtime
} else if ("costtime".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向costtime传值
v.setCosttime(Bytes.toString(CellUtil.cloneva lue(cell)));
//用于累加次数
v.setTimes(1);
}
}
}
//System.err.println(sb.toString());
//key的设置
k1.set(sb.toString());
//进行输出
context.write(k1, v);
}
}
reduce端主要进行业务逻辑的处理,把值累加,
代码如下:
package HbaseMR;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReduce extends Reducer<Text, HbaseBean2, Text, HbaseBean2> {
@Override
protected void reduce(Text key, Iterable<HbaseBean2> values, Context context) throws IOException, InterruptedException {
long sum = 0;
int ti = 0;
for (HbaseBean2 hb :
values) {
//每个手机号的通话时长进行累加
sum += Integer.parseInt(hb.getCosttime());
//次数进行累加
ti += hb.getTimes();
}
//重新输出key
String[] split = key.toString().split("-");
String res = split[0] + "\t" + split[1];
Text t = new Text();
t.set(res);
//把两个累加的值赋值给bean
HbaseBean2 s = new HbaseBean2(String.valueOf(sum), ti);
//进行输出
context.write(t, s);
}
}
driver端不同点舌map的设置:
package HbaseMR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MyDriver {
private static Configuration conf = new Configuration();
private static Connection conn;
static {
conf = HBaseConfiguration.create();
//连接zookeeper
conf.set("hbase.zookeeper.quorum",
"192.168.146.128,192.168.146.129,192.168.146.130");
//设置端口号
conf.set("hbase.master.port", "60000");
conf.set("hbase.zookeeper.property.clientPort", "2181");
try {
conn = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
// 1、创建Configuration
Job job = Job.getInstance(conf);
// 2、设置jar加载路径
job.setJarByClass(MyDriver.class);
//配置 Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//加载map和reduce
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
//设置map的输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(HbaseBean2.class);
//设置reduce的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(HbaseBean2.class);
//设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者是老
//job.setNumReduceTasks(0);
TableMapReduceUtil.initTableMapperJob(
"test", //数据源的表名
scan, //scan 扫描控制器
MyMap.class,//设置 Mapper 类
Text.class,//设置 Mapper 输出 key 类型
HbaseBean2.class,//设置 Mapper 输出 value 值类型
job);//设置给哪个 JOB
//输出路径
FileOutputFormat.setOutputPath(job, new Path(args[0]));
// 提交任务
boolean result = job.waitForCompletion(true);
System.out.println(result);
//System.exit(status);
}
}