设为首页 加入收藏

TOP

MapReduce部分:MapReduce的项目:好友推荐【Java版代码】
2018-11-13 14:55:51 】 浏览:53
Tags:MapReduce部分 MapReduce 项目 好友 推荐 Java 代码

源数据:

tom hello hadoop cat
world hadoop hello hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world
hello tom world hive mr

是简单的好友列表的差集吗?

最应该推荐的好友TopN,如何排名?

思路: 推荐者与被推荐者一定有一个或多个相同的好友

全局去寻找好友列表中两两关系

去除直接好友

统计两两关系出现次数

API:

map:按好友列表输出两俩关系

reduce:sum两两关系 再设计一个MR 生成详细报表

主方法:

package com.bjsxt.fof;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyFoF {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration(true);
		Job job =Job.getInstance(conf);
		
		//设置in
		
		Path input = new Path("/data/fof/input");
		FileInputFormat.addInputPath(job, input );
		//设置out
		
		Path output = new Path("/data/fof/outpath");
		if (output.getFileSystem(conf).exists(output)){
			output.getFileSystem(conf).delete(output);
		}
		FileOutputFormat.setOutputPath(job, output );
		
		//设置map
		job.setMapperClass(FMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//设置reduce
		job.setReducerClass(FReducer.class);
		
		//提交
		job.waitForCompletion(true);
		
		
	}

}

map端方法:

package com.bjsxt.fof;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	@Override
	public void map(LongWritable key, Text value, Mapper<LongWritable ,Text ,Text, IntWritable>.Context context) throws IOException, InterruptedException{
		Text mkey = new Text();
		IntWritable mval = new IntWritable();
		String [] strs=StringUtils.split(value.toString(),' ');
		for ( int i=1;i<strs.length;i++){
			mkey.set(getfof(strs[0],strs[i]));
			mval.set(0);
			
			context.write(mkey,mval);
			
			for(int j = i+1;j<strs.length;j++){
				mkey.set(getfof(strs[i],strs[j]));
				mval.set(1);
				context.write(mkey,mval);
			}
			
		}
	}

	private static String getfof(String s1, String s2) {
		
		if(s1.compareTo(s2)<0){
			return s1+":"+s2;
		}else{
			return s2+":"+s1;
		}

	}
}

reduce端方法:

package com.bjsxt.fof;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	IntWritable rval = new IntWritable();
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		
		//相同的key为一组,这一组数据调用一次reduce方法
		//方法内迭代这一组数据
		
		
		//hadoop:hello 0
		//hadoop:hello 1
		//hadoop:hello 0
		//hadoop:hello 1
		//hadoop:hello 0
		
		int flg =0;
		int sum=0;
		for (IntWritable v : values) {
			
			
			if(v.get() == 0){
				flg = 1;
			}
			sum += v.get();
		}
		if(flg ==0 ){
			rval.set(sum);
			context.write(key, rval);
			
		}
		
	}
}

提交命令:

总结:

1.写前边的配置部分:

Configuration conf = new Configuration(true);
Job job =Job.getInstance(conf);

2.写in:

Path input = new Path("/data/fof/input");
FileInputFormat.addInputPath(job, input );

3.写out:

Path output = new Path("/data/fof/outpath");
if (output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output);
}
FileOutputFormat.setOutputPath(job, output );

4.写map端:

job.setMapperClass(FMapper.class);

5.写map端的输出key的类型:
job.setMapOutputKeyClass(Text.class);

6.写map端输出value的类型:

job.setMapOutputValueClass(IntWritable.class);

7.写reduce端

job.setReducerClass(FReducer.class);

8.写提交部分
job.waitForCompletion(true);

(本次省略了map端的sort和reduce端的sort)

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇基于CDH 5.9.1 搭建 Hive on Spar.. 下一篇hive变量调用 hive和shell hdfs命..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目