设为首页 加入收藏

TOP

Mapjoin和Reducejoin案例(一)
2019-09-17 19:02:13 】 浏览:77
Tags:Mapjoin Reducejoin 案例

一、Mapjoin案例

  1.需求:有两个文件,分别是订单表、商品表,

  订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表),

  商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存),

  要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。

  2.解决思路:

  将商品表加载到内存中,然后再map方法中将订单表中的商品id对应的商品名称添加到该行的最后,不需要Reducer,并在Driver执行类中设置setCacheFile和numReduceTask。

  3.代码如下:

public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	HashMap<String, String> pdMap = new HashMap<>();
	//1.商品表加载到内存
	protected void setup(Context context) throws IOException {
		
		//加载缓存文件
		BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8"));
		
		String line;
		
		while(StringUtils.isNotEmpty(line = br.readLine()) ) {
			
			//切分
			String[] fields = line.split("\t");
			
			//缓存
			pdMap.put(fields[0], fields[1]);
			
		}
		
		br.close();
	
	}
		
		
		
	//2.map传输
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		//获取数据
		String line = value.toString();
		
		//切割
		String[] fields = line.split("\t");
		
		//获取订单中商品id
		String pid = fields[1];
		
		//根据订单商品id获取商品名
		String pName = pdMap.get(pid);
		
		//拼接数据
		line = line + "\t" + pName;
		
		//输出
		context.write(new Text(line), NullWritable.get());
	}
}

public class CacheDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		// 1.获取job信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2.获取jar包
		job.setJarByClass(CacheDriver.class);

		// 3.获取自定义的mapper与reducer类
		job.setMapperClass(CacheMapper.class);

		// 5.设置reduce输出的数据类型(最终的数据类型)
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 6.设置输入存在的路径与处理后的结果路径
		FileInputFormat.setInputPaths(job, new Path("c://table1029//in"));
		FileOutputFormat.setOutputPath(job, new Path("c://table1029//out"));
		
		//加载缓存商品数据
		job.addCacheFile(new URI("file:///c:/inputcache/pd.txt"));
		
		//设置一下reducetask的数量
		job.setNumReduceTasks(0);

		// 7.提交任务
		boolean rs = job.waitForCompletion(true);
		System.out.println(rs ? 0 : 1);
	}
}

  

二、Reducejoin案例

  1.需求:同上的两个数据文件,要求将订单表中的商品id替换成对应的商品名称。

  2.解决思路:封装TableBean类,包含属性:时间、商品id、订单id、商品名称、flag(flag用来判断是哪张表),

    使用Mapper读两张表,通过context对象获取切片对象,然后通过切片获取切片名称和路径的字符串来判断是哪张表,再将切片的数据封装到TableBean对象,最后以产品id为key、TableBean对象为value传输到Reducer端;

    Reducer接收数据后通过flag判断是哪张表,因为一个reduce中的所有数据的key是相同的,将商品表的商品id和商品名称读入到一个TableBean对象中,然后将订单表的中的数据读入到TableBean类型的ArrayList对象中,然后将ArrayList中的每个TableBean的商品id替换为商品名称,然后遍历该数组以TableBean为key输出。

  3.代码如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/30, 2:37
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class TableBean implements Writable {
    private String timeStamp;
    private String productId;
    private String orderId;
    private String productName;
    private String flag;

    public TableBean() {
    }

    public String getTimeStamp()
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop的数据压缩 下一篇Hive的安装配置

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目