源数据:
tom hello hadoop cat
world hadoop hello hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world
hello tom world hive mr
是简单的好友列表的差集吗?
最应该推荐的好友TopN,如何排名?
思路: 推荐者与被推荐者一定有一个或多个相同的好友
全局去寻找好友列表中两两关系
去除直接好友
统计两两关系出现次数
API:
map:按好友列表输出两俩关系
reduce:sum两两关系 再设计一个MR 生成详细报表
主方法:
package com.bjsxt.fof;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyFoF {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration(true);
Job job =Job.getInstance(conf);
//设置in
Path input = new Path("/data/fof/input");
FileInputFormat.addInputPath(job, input );
//设置out
Path output = new Path("/data/fof/outpath");
if (output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output);
}
FileOutputFormat.setOutputPath(job, output );
//设置map
job.setMapperClass(FMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce
job.setReducerClass(FReducer.class);
//提交
job.waitForCompletion(true);
}
}
map端方法:
package com.bjsxt.fof;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable ,Text ,Text, IntWritable>.Context context) throws IOException, InterruptedException{
Text mkey = new Text();
IntWritable mval = new IntWritable();
String [] strs=StringUtils.split(value.toString(),' ');
for ( int i=1;i<strs.length;i++){
mkey.set(getfof(strs[0],strs[i]));
mval.set(0);
context.write(mkey,mval);
for(int j = i+1;j<strs.length;j++){
mkey.set(getfof(strs[i],strs[j]));
mval.set(1);
context.write(mkey,mval);
}
}
}
private static String getfof(String s1, String s2) {
if(s1.compareTo(s2)<0){
return s1+":"+s2;
}else{
return s2+":"+s1;
}
}
}
reduce端方法:
package com.bjsxt.fof;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable rval = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//相同的key为一组,这一组数据调用一次reduce方法
//方法内迭代这一组数据
//hadoop:hello 0
//hadoop:hello 1
//hadoop:hello 0
//hadoop:hello 1
//hadoop:hello 0
int flg =0;
int sum=0;
for (IntWritable v : values) {
if(v.get() == 0){
flg = 1;
}
sum += v.get();
}
if(flg ==0 ){
rval.set(sum);
context.write(key, rval);
}
}
}
提交命令:
总结:
1.写前边的配置部分:
Configuration conf = new Configuration(true);
Job job =Job.getInstance(conf);
2.写in:
Path input = new Path("/data/fof/input");
FileInputFormat.addInputPath(job, input );
3.写out:
Path output = new Path("/data/fof/outpath");
if (output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output);
}
FileOutputFormat.setOutputPath(job, output );
4.写map端:
job.setMapperClass(FMapper.class);
5.写map端的输出key的类型:
job.setMapOutputKeyClass(Text.class);
6.写map端输出value的类型:
job.setMapOutputValueClass(IntWritable.class);
7.写reduce端
job.setReducerClass(FReducer.class);
8.写提交部分
job.waitForCompletion(true);
(本次省略了map端的sort和reduce端的sort)