设为首页 加入收藏

TOP

【七】hadoop编程之基于物品的协同过滤推荐算法ItemCF
2019-04-18 00:39:01 】 浏览:85
Tags:hadoop 编程 基于 物品 协同 过滤 推荐 算法 ItemCF
版权声明:转载注明出处 https://blog.csdn.net/jy02268879/article/details/80370997

基于物品的协同过滤推荐算法:给用户推荐一些他们以前感兴趣的物品相似的物品

模拟情景

用户 A B C

商品 1 2 3 4 5 6

行为点击 1.0分搜索 2.0分收藏 5.0分付款 10.0分


用户行为列表如下

用户 物品 行为

A 1 点击

C 3 收藏

B 2 搜索

B 5 搜索

B 6 收藏

A 2 付款

C 3 付款

C 4 收藏

C 1 收藏

A 1 点击

A 6 收藏

A 4 搜索

1.根据用户行为列表得到用户、物品的评分矩阵

A B C

1 点击*2 无 收藏

2 付款 搜索 无

3 无 无 收藏+付款

4 搜索 无 收藏

5 无 搜索 无

6 收藏 收藏 无

算出的权重代表用户对物品的喜好程度

A B C

1 2 0 5

2 10 3 0

3 0 0 15

4 3 0 5

5 0 3 0

6 5 5 0

2.根据用户物品评分矩阵计算物品与物品的相似度矩阵

A B C

1 2 0 5

2 10 3 0

物品1与2的相似度

1 2 3 4 5 6

1 1 0.36 0.93 0.99 0 0.26

2 0.36 1.0 0 0.49 0.29 0.88

3 0.93 0 1.0 0.86 0 0

4 0.99 0.49 0.86 1.0 0 0.36

5 0 0.29 0 0 1.0 0.71

6 0.26 0.88 0 0.36 0.71 1.0

3.相似度矩阵*评分矩阵=推荐列表

A B C

1 9.9 2.4 23.9

2 16.6 8.3 4.3

3 4.4 0 24.0

4 11.7 3.3 22.9

5 6.5 7.4 0

6 15.4 9.8 3.1

4.在推荐列表中,将之前产生过操作的物品(即评分矩阵中有过评分的物品)置零

A B C

1 0 2.4 0

2 0 0 4.3

3 4.4 0 0

4 0 3.3 0

5 6.5 0 0

6 0 0 3.1

5.取每个用户最感兴趣的物品来推荐


项目目录:


输入文件如下


MapReduce步骤

1.根据用户行为列表构建评分矩阵

输入:用户ID,物品ID,分值

输出:物品ID(行)——用户ID(列)——分值

代码:

mapper1

package step1;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午10:36:18
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 根据用户行为列表得到用户、物品的评分矩阵
 */
public class Mapper1  extends Mapper<LongWritable, Text, Text, Text>  {
	private Text outKey = new Text();
	private Text outValue = new Text();
	/**
	 * key:行号1
	 * value:A,1,1	用户A对物品1有过点击操作(分值1)
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       String[] values = value.toString().split(",");
       String userID = values[0];
       String itemID = values[1];
       String score = values[2];
       
      
    	   
	   //key:列号	物品ID	value:行号_值	用户ID_分值
	   outKey.set(itemID);
	   outValue.set(userID+"_"+score);
	   
	   context.write(outKey, outValue);
       
    } 
}

reducer1

package step1;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午10:56:28
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 
 * 根据用户行为列表得到用户、物品的评分矩阵
 */
public class Reducer1 extends Reducer<Text, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //key:行号 物品ID		value:列号_值,列号_值,列号_值,列号_值,列号_值...    用户ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		String itemID=key.toString();
		
		//userID,score
		Map<String,Integer> map = new HashMap<>();
		
		//text:行号_值
		for(Text value:values){  
			String[] split = value.toString().split("_");
			String userID = split[0];
			String score = split[1];
			
			if(map.get(userID)==null){
				map.put(userID, Integer.parseInt(score));
			}else{
				Integer preScore = map.get(userID);
				map.put(userID, preScore+Integer.parseInt(score));
			}
		}
		StringBuilder sb =  new StringBuilder();
		for(Map.Entry<String,Integer> entry:map.entrySet()){
			String userID = entry.getKey();
			String score = String.valueOf(entry.getValue());
			sb.append(userID).append("_").append(score).append(",");
		}
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	
		//key:行号 物品ID		value:列号_值,列号_值,列号_值,列号_值,列号_值...    用户ID_分值
		outKey.set(itemID);
		outValue.set(line);
		
		context.write(outKey,outValue);  
	}
	
	
}

mr1

package step1;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:07:13
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 根据用户行为列表得到用户、物品的评分矩阵
 */
public class MR1 {
	private static String inputPath = "/ItemCF/step1_input/actionList.txt";
	private static String outputPath = "/ItemCF/step1_output";
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step1");
		
		
		//配置任务map和reduce类  
		job.setJarByClass(MR1.class);  
		job.setJar("F:\\eclipseworkspace\\ItemCF\\ItemCF.jar");  
	      job.setMapperClass(Mapper1.class);  
	      job.setReducerClass(Reducer1.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return -1;
	}
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR1().run();
		if(result==1){
			System.out.println("step1运行成功");
		}else if(result==-1){
			System.out.println("step1运行失败");
		}
	  }
}

输出结果



2.利用评分矩阵,构建物品与物品的相似度矩阵

输入:步骤1输出

缓存:步骤1输出

(输出与缓存是相同文件)

输出:物品ID(行)——物品ID(列)——相似度

代码:

mapper2:

package step2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:51
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 
 * 根据用户物品评分矩阵计算物品与物品的相似度矩阵
 */
public class Mapper2 extends Mapper<LongWritable, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	private List<String> cacheList = new ArrayList<String>();
	//			右矩阵列值    下标右行       右值
	//private Map<String,String[]> cacheMap = new HashMap<>();
	
	private DecimalFormat df = new DecimalFormat("0.00");
	
	/**在map执行之前会执行这个方法,只会执行一次
	 * 
	 * 通过输入流将全局缓存中的矩阵读入一个java容器中
	 */
	@Override
	protected void setup(Context context)throws IOException, InterruptedException {
		super.setup(context);
		FileReader fr = new FileReader("itemUserScore1");
		BufferedReader br  = new BufferedReader(fr);
		
		//右矩阵	
		//key:行号 物品ID		value:列号_值,列号_值,列号_值,列号_值,列号_值...    用户ID_分值
		String line = null;
		while((line=br.readLine())!=null){
			cacheList.add(line);
			/**String[] cloumnAndLine_matrix2 = line.split("\t");
			String itemID = cloumnAndLine_matrix2[0];
			String[] row_value_array_matrix2 =cloumnAndLine_matrix2[1].split(",");
			String[] row_value_list_matrix2 = new String[row_value_array_matrix2.length];
			for(int i = 0;i<row_value_array_matrix2.length;i++){
				String row_value = row_value_array_matrix2[i];
				String[] split = row_value.split("_");
				String userID = split[0];
				String score = split[1];
				row_value_list_matrix2[Integer.parseInt(userID)-1]=score;
			}
			cacheMap.put(itemID, row_value_list_matrix2);*/
		}
		
		fr.close();
		br.close();
	}


	/**
	 * key: 行号	物品ID
	 * value:行	列_值,列_值,列_值,列_值	用户ID_分值
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       
       String[] rowAndLine_matrix1 = value.toString().split("\t");
       
       //矩阵行号
       String row_matrix1 = rowAndLine_matrix1[0];
       //列_值
       String[] cloumn_value_array_matrix1 = rowAndLine_matrix1[1].split(",");
       
       //计算左侧矩阵行的空间距离
       double denominator1 = 0;
       for(String column_value:cloumn_value_array_matrix1){
    	   String score = column_value.split("_")[1];
    	   denominator1 += Double.valueOf(score)*Double.valueOf(score);
       }
       denominator1 = Math.sqrt(denominator1);
       /** 
       //右矩阵列集合
       Set<String> cloumns_matrix2 = cacheMap.keySet();
       
       for(String cloumn_matrix2:cloumns_matrix2){
	       //矩阵两位相乘得到的结果	分子
		   int numerator = 0;
		   String[] row_value_list_matrix2 = cacheMap.get(cloumn_matrix2);//取右矩阵第n行  即是N物品所有的用户评分
	       
	       //计算右侧矩阵行的空间距离
	       double denominator2 = 0;
	       for(String column_value:row_value_list_matrix2){
	    	   String score = column_value.split("_")[1];
	    	   denominator2 += Double.valueOf(score)*Double.valueOf(score);
	       }
	       denominator2 = Math.sqrt(denominator2);
		   
		   
		   for(String cloumn_value_matrix1:cloumn_value_array_matrix1){
	    	  
	    	   String[] split = cloumn_value_matrix1.split("_");
	    	   int cloumn_matrix1 = Integer.parseInt(split[0]);
	    	   int v_matrix1 = Integer.parseInt(split[1]);
	    	   int v_matrix2 = Integer.parseInt(row_value_list_matrix2[cloumn_matrix1-1]);//取右矩阵第n列第cloumn_matrix1行
	    	   numerator +=v_matrix1*v_matrix2;
			
	       }
	   	
		   double cos = numerator/(denominator1*denominator2);
		   if(cos == 0){
			   continue;
		   }
		   
	  	   //cos就是结果矩阵中的某个元素,坐标
	  	   outKey.set(row_matrix1);
	  	   outValue.set(cloumn_matrix2+"_"+df.format(cos));
	  	   System.out.println("mapper2---send-->key:"+outKey+" value:"+outValue);
	  	   //输出格式为	key:行 物品ID	value:列_值	用户ID_分值
	  	   context.write(outKey, outValue);
       }*/
       for(String line:cacheList){
    	   
    	   String[] rowAndLine_matrix2 = line.toString().split("\t");
    	   //右侧矩阵line
    	   //格式: 列 tab 行_值,行_值,行_值,行_值
    	   String cloumn_matrix2 = rowAndLine_matrix2[0];
    	   String[] row_value_array_matrix2 = rowAndLine_matrix2[1].split(",");
    	   
    	 //计算右侧矩阵行的空间距离
	       double denominator2 = 0;
	       for(String column_value:row_value_array_matrix2){
	    	   String score = column_value.split("_")[1];
	    	   denominator2 += Double.valueOf(score)*Double.valueOf(score);
	       }
	       denominator2 = Math.sqrt(denominator2);
    	   
	       //矩阵两位相乘得到的结果	分子
		   int numerator = 0;
    	   
    	   
    	   //遍历左侧矩阵一行的每一列
    	   
    	  for(String cloumn_value_matrix1:cloumn_value_array_matrix1){
    		  String cloumn_matrix1 = cloumn_value_matrix1.split("_")[0];
    		  String value_matrix1 = cloumn_value_matrix1.split("_")[1];
    		  
    		  //遍历右侧矩阵一行的每一列
    		  for(String cloumn_value_matrix2:row_value_array_matrix2){
    			  if(cloumn_value_matrix2.startsWith(cloumn_matrix1+"_")){
    				  String value_matrix2 = cloumn_value_matrix2.split("_")[1];
    				  //将两列的值相乘并累加
    				  numerator+= Integer.valueOf(value_matrix1)*Integer.valueOf(value_matrix2);
    				  
    			  }
    		  }
    	  }
    	  
		   double cos = numerator/(denominator1*denominator2);
		   if(cos == 0){
			   continue;
		   }
    	  
    	  //cos就是结果矩阵中的某个元素,坐标	行:row_matrix1 	列:row_matrix2(右侧矩阵已经被转置)
    	  outKey.set(row_matrix1);
    	  outValue.set(cloumn_matrix2+"_"+df.format(cos));
    	  //输出格式为	key:行	value:列_值
    	  context.write(outKey, outValue);
       }
    } 
}

reducer2:

package step2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:59
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 根据用户物品评分矩阵计算物品与物品的相似度矩阵
 */
public class Reducer2 extends Reducer<Text, Text, Text, Text>{
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //	key:行 物品ID	value:列_值	用户ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();

		for(Text text:values){  
			sb.append(text+",");
        }
		
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	

		outKey.set(key);
		outValue.set(line);

		context.write(outKey,outValue);  
	}
	
}

mr2:

package step2;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:44:07
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 根据用户物品评分矩阵计算物品与物品的相似度矩阵
 */
public class MR2 {
	private static String inputPath = "/ItemCF/step1_output";
	private static String outputPath = "/ItemCF/step2_output";
	//将step1中输出的转置矩阵作为全局缓存
	private static String cache="/ItemCF/step1_output/part-r-00000";
	
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step2");
		//如果未开启,使用 FileSystem.enableSymlinks()方法来开启符号连接。
		FileSystem.enableSymlinks();
		//要使用符号连接,需要检查是否启用了符号连接
		 boolean areSymlinksEnabled = FileSystem.areSymlinksEnabled();
		 System.out.println(areSymlinksEnabled);
		//添加分布式缓存文件
		job.addCacheArchive(new URI(cache+"#itemUserScore1"));
		
	
		//配置任务map和reduce类  
		job.setJarByClass(MR2.class);  
		job.setJar("F:\\eclipseworkspace\\ItemCF\\ItemCF.jar");  
	      job.setMapperClass(Mapper2.class);  
	      job.setReducerClass(Reducer2.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (URISyntaxException e) {
			e.printStackTrace();
		}
		return -1;
	}
	
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR2().run();
		if(result==1){
			System.out.println("step2运行成功");
		}else if(result==-1){
			System.out.println("step2运行失败");
		}
	  }
}

输出结果:



3.将评分矩阵转置

输入:步骤1输出

输出:用户ID(行)——物品ID(列)——分值

代码:

mapper3:

package step3;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午10:36:18
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 将评分矩阵转置
 */
public class Mapper3  extends Mapper<LongWritable, Text, Text, Text>  {
	private Text outKey = new Text();
	private Text outValue = new Text();
	/**
	 * key:1
	 * value:1	1_0,2_3,3_-1,4_2,5_-3
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       String[] rowAndLine = value.toString().split("\t");
       
       //矩阵行号	物品ID
       String itemID = rowAndLine[0];
       //列值	用户ID_分值
       String[] lines = rowAndLine[1].split(",");
       
       
       for(int i = 0 ; i<lines.length; i++){
    	   String userID = lines[i].split("_")[0];
    	   String score = lines[i].split("_")[1];
    	   
    	   //key:列号	 用户ID	value:行号_值	 物品ID_分值
    	   outKey.set(userID);
    	   outValue.set(itemID+"_"+score);
    	   
    	   context.write(outKey, outValue);
       }
    } 
}

reducer3:

package step3;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午10:56:28
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 
 * 
 * 将评分矩阵转置
 */
public class Reducer3 extends Reducer<Text, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //key:列号	 用户ID		value:行号_值,行号_值,行号_值,行号_值...	物品ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();
		
		//text:行号_值		物品ID_分值
		for(Text text:values){  
            sb.append(text).append(",");
        }  
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	
		
		outKey.set(key);
		outValue.set(line);
		
		context.write(outKey,outValue);  
	}
	
	
}

mr3:

package step3;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:07:13
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 将评分矩阵转置
 */
public class MR3 {
	private static String inputPath = "/ItemCF/step1_output";
	private static String outputPath = "/ItemCF/step3_output";
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step3");
		
		
		//配置任务map和reduce类  
		job.setJarByClass(MR3.class);  
		job.setJar("F:\\eclipseworkspace\\ItemCF\\ItemCF.jar");  
	      job.setMapperClass(Mapper3.class);  
	      job.setReducerClass(Reducer3.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return -1;
	}
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR3().run();
		if(result==1){
			System.out.println("step3运行成功");
		}else if(result==-1){
			System.out.println("step3运行失败");
		}
	  }
}

结果:


4.物品与物品的相似度矩阵X评分矩阵(经过步骤3转置)

输入:步骤2输出

缓存:步骤3输出

输出:物品ID(行)——用户ID(列)——分值

代码:

mapper4:

package step4;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:51
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 
 * 物品和物品的相似度矩阵X评分矩阵(经过步骤3转置)
 */
public class Mapper4 extends Mapper<LongWritable, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	private List<String> cacheList = new ArrayList<String>();
	
	private DecimalFormat df = new DecimalFormat("0.00");
		
	/**在map执行之前会执行这个方法,只会执行一次
	 * 
	 * 通过输入流将全局缓存中的矩阵读入一个java容器中
	 */
	@Override
	protected void setup(Context context)throws IOException, InterruptedException {
		super.setup(context);
		FileReader fr = new FileReader("itemUserScore2");
		BufferedReader br  = new BufferedReader(fr);
		
		//右矩阵	
		//key:行号 物品ID		value:列号_值,列号_值,列号_值,列号_值,列号_值...    用户ID_分值
		String line = null;
		while((line=br.readLine())!=null){
			cacheList.add(line);
		}
		
		fr.close();
		br.close();
	}


	/**
	 * key: 行号	物品ID
	 * value:行	列_值,列_值,列_值,列_值	用户ID_分值
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       
       String[] rowAndLine_matrix1 = value.toString().split("\t");
       
       //矩阵行号
       String row_matrix1 = rowAndLine_matrix1[0];
       //列_值
       String[] cloumn_value_array_matrix1 = rowAndLine_matrix1[1].split(",");
       
       for(String line:cacheList){
    	   
    	   String[] rowAndLine_matrix2 = line.toString().split("\t");
    	   //右侧矩阵line
    	   //格式: 列 tab 行_值,行_值,行_值,行_值
    	   String cloumn_matrix2 = rowAndLine_matrix2[0];
    	   String[] row_value_array_matrix2 = rowAndLine_matrix2[1].split(",");
    	   
    	   
	       //矩阵两位相乘得到的结果	
		   double result = 0;
    	   
    	   
    	   //遍历左侧矩阵一行的每一列
    	  for(String cloumn_value_matrix1:cloumn_value_array_matrix1){
    		  String cloumn_matrix1 = cloumn_value_matrix1.split("_")[0];
    		  String value_matrix1 = cloumn_value_matrix1.split("_")[1];
    		  
    		  //遍历右侧矩阵一行的每一列
    		  for(String cloumn_value_matrix2:row_value_array_matrix2){
    			  if(cloumn_value_matrix2.startsWith(cloumn_matrix1+"_")){
    				  String value_matrix2 = cloumn_value_matrix2.split("_")[1];
    				  //将两列的值相乘并累加
    				  result+= Double.valueOf(value_matrix1)*Double.valueOf(value_matrix2);
    				  
    			  }
    		  }
    	  }
    	  
    	  if(result==0){
    		  continue;
    	  }
    	  //result就是结果矩阵中的某个元素,坐标	行:row_matrix1 	列:row_matrix2(右侧矩阵已经被转置)
    	  outKey.set(row_matrix1);
    	  outValue.set(cloumn_matrix2+"_"+df.format(result));
    	  //输出格式为	key:行	value:列_值
    	  context.write(outKey, outValue);
       }
    } 
}

reducer4:

package step4;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:59
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 物品和物品的相似度矩阵X评分矩阵(经过步骤3转置)
 */
public class Reducer4 extends Reducer<Text, Text, Text, Text>{
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //	key:行 物品ID	value:列_值	用户ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();

		for(Text text:values){  
			sb.append(text+",");
        }
		
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	

		outKey.set(key);
		outValue.set(line);

		context.write(outKey,outValue);  
	}
	
}

mr4:

package step4;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:44:07
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 物品和物品的相似度矩阵X评分矩阵(经过步骤3转置)
 */
public class MR4 {
	private static String inputPath = "/ItemCF/step2_output";
	private static String outputPath = "/ItemCF/step4_output";
	//将step1中输出的转置矩阵作为全局缓存
	private static String cache="/ItemCF/step3_output/part-r-00000";
	
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step4");
		//如果未开启,使用 FileSystem.enableSymlinks()方法来开启符号连接。
		FileSystem.enableSymlinks();
		//要使用符号连接,需要检查是否启用了符号连接
		 boolean areSymlinksEnabled = FileSystem.areSymlinksEnabled();
		 System.out.println(areSymlinksEnabled);
		//添加分布式缓存文件
		job.addCacheArchive(new URI(cache+"#itemUserScore2"));
		
	
		//配置任务map和reduce类  
		job.setJarByClass(MR4.class);  
		job.setJar("F:\\eclipseworkspace\\ItemCF\\ItemCF.jar");  
	      job.setMapperClass(Mapper4.class);  
	      job.setReducerClass(Reducer4.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (URISyntaxException e) {
			e.printStackTrace();
		}
		return -1;
	}
	
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR4().run();
		if(result==1){
			System.out.println("step4运行成功");
		}else if(result==-1){
			System.out.println("step4运行失败");
		}
	  }
}

结果:


5.根据评分矩阵,将步骤4的输出中,用户已经有过行为的商品分值置零

输入:步骤4输出

缓存:步骤1输出

输出:用户ID(行)——物品ID(列)——分值(最终推荐列表)

代码:

mapper5:

package step5;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:51
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 
 * 根据评分矩阵,将步骤4输出中,用户已经有过行为的物品的分值置为零
 */
public class Mapper5 extends Mapper<LongWritable, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	private List<String> cacheList = new ArrayList<String>();
			
	/**在map执行之前会执行这个方法,只会执行一次
	 * 
	 * 通过输入流将全局缓存中的矩阵读入一个java容器中
	 */
	@Override
	protected void setup(Context context)throws IOException, InterruptedException {
		super.setup(context);
		FileReader fr = new FileReader("itemUserScore3");
		BufferedReader br  = new BufferedReader(fr);
		
		//右矩阵	
		//key:行号 物品ID		value:列号_值,列号_值,列号_值,列号_值,列号_值...    用户ID_分值
		String line = null;
		while((line=br.readLine())!=null){
			cacheList.add(line);
		}
		
		fr.close();
		br.close();
	}


	/**
	 * key: 行号	物品ID
	 * value:行	列_值,列_值,列_值,列_值	用户ID_分值
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       
       String[] rowAndLine_matrix1 = value.toString().split("\t");
       
       //矩阵行号	物品ID
       String item_matrix1 = rowAndLine_matrix1[0];
       //列_值
       String[] user_score_array_matrix1 = rowAndLine_matrix1[1].split(",");
       
       for(String line:cacheList){
    	   
    	   String[] rowAndLine_matrix2 = line.toString().split("\t");
    	   //右侧矩阵line
    	   //格式: 列 tab 行_值,行_值,行_值,行_值
    	   String item__matrix2 = rowAndLine_matrix2[0];
    	   String[] user_score_array_matrix2 = rowAndLine_matrix2[1].split(",");
    	   
    	   
    	   
	       //矩阵两位相乘得到的结果	
		   //double result = 0;
    	   
    	   //如果物品ID物品相同
    	   if(item_matrix1.equals(item__matrix2)){
    		   
    		   //遍历matrix1的列
    		   for(String user_score_matrix1:user_score_array_matrix1){
    			   boolean flag = false;
    			   String user_matrix1 = user_score_matrix1.split("_")[0];
    			   String score_matrix1 = user_score_matrix1.split("_")[1];

    			   //遍历matrix2的列
    			   for(String user_score_matrix2:user_score_array_matrix2){
        			   String user_matrix2 = user_score_matrix2.split("_")[0];
        			   if(user_matrix1.equals(user_matrix2)){
        				   flag = true;
        			   }
    			   }
    			   //该用户没有对该物品产生行为
    			   if(flag==false){
    				   outKey.set(user_matrix1);
    				   outValue.set(item_matrix1+"_"+score_matrix1);
    				   //输出格式为	key:行	value:列_值
    				   context.write(outKey, outValue);
    			   }
    		   }
    	   }
    	  
       }
    } 
}

reducer5:

package step5;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:59
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 根据评分矩阵,将步骤4输出中,用户已经有过行为的物品的分值置为零
 */
public class Reducer5 extends Reducer<Text, Text, Text, Text>{
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //	key:行 物品ID	value:列_值	用户ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();

		for(Text text:values){  
			sb.append(text+",");
        }
		
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	

		outKey.set(key);
		outValue.set(line);

		context.write(outKey,outValue);  
	}
	
}

mr5:

package step5;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:44:07
 * @email 37024760@qq.com
 * @remark
 * @version 
 * 
 * 根据评分矩阵,将步骤4输出中,用户已经有过行为的物品的分值置为零
 */
public class MR5 {
	private static String inputPath = "/ItemCF/step4_output";
	private static String outputPath = "/ItemCF/step5_output";
	//将step1中输出的转置矩阵作为全局缓存
	private static String cache="/ItemCF/step1_output/part-r-00000";
	
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step5");
		//如果未开启,使用 FileSystem.enableSymlinks()方法来开启符号连接。
		FileSystem.enableSymlinks();
		//要使用符号连接,需要检查是否启用了符号连接
		 boolean areSymlinksEnabled = FileSystem.areSymlinksEnabled();
		 System.out.println(areSymlinksEnabled);
		//添加分布式缓存文件
		job.addCacheArchive(new URI(cache+"#itemUserScore3"));
		
	
		//配置任务map和reduce类  
		job.setJarByClass(MR5.class);  
		job.setJar("F:\\eclipseworkspace\\ItemCF\\ItemCF.jar");  
	      job.setMapperClass(Mapper5.class);  
	      job.setReducerClass(Reducer5.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (URISyntaxException e) {
			e.printStackTrace();
		}
		return -1;
	}
	
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR5().run();
		if(result==1){
			System.out.println("step5运行成功");
		}else if(result==-1){
			System.out.println("step5运行失败");
		}
	  }
}

结果:






】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop JOB的性能优化实践 下一篇hadoop运行环境安装与配置+hadoop..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目