一、Mapjoin案例
1.需求:有两个文件,分别是订单表、商品表,
订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表),
商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存),
要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。
2.解决思路:
将商品表加载到内存中,然后再map方法中将订单表中的商品id对应的商品名称添加到该行的最后,不需要Reducer,并在Driver执行类中设置setCacheFile和numReduceTask。
3.代码如下:
public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ HashMap<String, String> pdMap = new HashMap<>(); //1.商品表加载到内存 protected void setup(Context context) throws IOException { //加载缓存文件 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8")); String line; while(StringUtils.isNotEmpty(line = br.readLine()) ) { //切分 String[] fields = line.split("\t"); //缓存 pdMap.put(fields[0], fields[1]); } br.close(); } //2.map传输 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //获取数据 String line = value.toString(); //切割 String[] fields = line.split("\t"); //获取订单中商品id String pid = fields[1]; //根据订单商品id获取商品名 String pName = pdMap.get(pid); //拼接数据 line = line + "\t" + pName; //输出 context.write(new Text(line), NullWritable.get()); } } public class CacheDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1.获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.获取jar包 job.setJarByClass(CacheDriver.class); // 3.获取自定义的mapper与reducer类 job.setMapperClass(CacheMapper.class); // 5.设置reduce输出的数据类型(最终的数据类型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 6.设置输入存在的路径与处理后的结果路径 FileInputFormat.setInputPaths(job, new Path("c://table1029//in")); FileOutputFormat.setOutputPath(job, new Path("c://table1029//out")); //加载缓存商品数据 job.addCacheFile(new URI("file:///c:/inputcache/pd.txt")); //设置一下reducetask的数量 job.setNumReduceTasks(0); // 7.提交任务 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
二、Reducejoin案例
1.需求:同上的两个数据文件,要求将订单表中的商品id替换成对应的商品名称。
2.解决思路:封装TableBean类,包含属性:时间、商品id、订单id、商品名称、flag(flag用来判断是哪张表),
使用Mapper读两张表,通过context对象获取切片对象,然后通过切片获取切片名称和路径的字符串来判断是哪张表,再将切片的数据封装到TableBean对象,最后以产品id为key、TableBean对象为value传输到Reducer端;
Reducer接收数据后通过flag判断是哪张表,因为一个reduce中的所有数据的key是相同的,将商品表的商品id和商品名称读入到一个TableBean对象中,然后将订单表的中的数据读入到TableBean类型的ArrayList对象中,然后将ArrayList中的每个TableBean的商品id替换为商品名称,然后遍历该数组以TableBean为key输出。
3.代码如下:
/** * @author: PrincessHug * @date: 2019/3/30, 2:37 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class TableBean implements Writable { private String timeStamp; private String productId; private String orderId; private String productName; private String flag; public TableBean() { } public String getTimeStamp()