一、 需求描述:
usr:friend,friend,friend... --------------- A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
最终结果:
A,B C,E
A,C D,F
A,D F,E
A,F B,C,D,E,O
B,E C
C,F A,D
D,E L
D,F A,E
D,L E,F
E,L D
F,M E
H,O A
I,O A
package com.friends.zb;
import java.io.IOException; import java.util.ArrayList; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** * 找朋友 * * @author zhangbing * */ public class Friends {
public static class M1 extends Mapper<LongWritable , Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(":"); String[] split2 = split[1].split(","); for(String s:split2){ context.write(new Text(s), new Text(split[0])); } } } public static class R1 extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { List<String> list = new ArrayList<>();
for (Text t : values) { list.add(t.toString()); } Text k = new Text(); for (String s1 : list) { for (String s2 : list) { if(s1.compareTo(s2)<0){ k.set(s1+","+s2); context.write(k, key); } } String string = s1+","+key.toString(); if(s1.compareTo(key.toString())>0){ string=key.toString()+","+s1; } context.write(new Text(string), new Text("1")); } } }
public static class M2 extends Mapper<LongWritable , Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); context.write(new Text(split[0]), new Text(split[1])); } } public static class R2 extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { int count = 0; StringBuffer sb = new StringBuffer(); for (Text text : values) { String s = text.toString(); if("1".equals(s)){ count++; }else{ sb.append(",").append(s); } } if(count == 2 && sb.length()>0){ context.write(key,new Text(sb.toString().substring(1))); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Friends.class); job.setMapperClass(M1.class); job.setReducerClass(R1.class); job.setOutputKeyClass(Text.class); job.set |