设为首页 加入收藏

TOP

MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录(一)
2015-07-20 17:34:04 来源: 作者: 【 】 浏览:5
Tags:MapReduce 编程 系列 根据 输入 路径 产生 输出 清除 HDFS 目录

有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的。这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result.

同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除。

先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output.

  
  
   
    4.0.0
   
  
   
    org.freebird
   
  
   
    mr1_example3
   
  
   
    jar
   
  
   
    1.0-SNAPSHOT
   
  
   
    mr1_example3
   
  
   
    http://maven.apache.org
   
  
    
     
     
      org.apache.hadoop
      
     
      hadoop-core
      
     
      1.2.1
      
     
   
  
    
     
      
      
       org.codehaus.mojo
       
      
       exec-maven-plugin
       
      
       1.3.2
       
       
        
         
         
          exec
          
         
        
       
       
       
        hadoop
        
        
        
         jar
         
        
         target/mr1_example3-1.0-SNAPSHOT.jar
         
        
         org.freebird.LogJob
         
        
         /user/chenshu/share/logs
         
        
       
      
     
   

  

LogJob.java做了修改,主要用Path, FileSystem和Configuration三个类配合,删除HDFS已经存在的目录。

并且只设置了一个NamedOutput,名为result.

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.freebird.reducer.LogReducer;
import org.freebird.mapper.LogMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;



public class LogJob {                                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                  
    public static void main(String[] args) throws Exception {                                                                                                                                                                                                                     
        String inputPath = args[0];                                                                                                                                                                                                                                               
        if (inputPath.endsWith("/")) {                                                                                                                                                                                                                                            
            inputPath = inputPath.substring(0, inputPath.length() -1);                                                                                                                                                                                                            
        }                                                                                                                                                                                                                                                                         
        System.out.println("args[0] indicates input folder path, the last / will be removed if it exists:" + inputPath);                                                                                                                                                          
        String outputPath = inputPath + "/output";                                                                                                                                                                                                                                
        System.out.println("output folder path is:" + outputPath);                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                  
        Configuration conf = new Configuration();                                                                                                                                                                                                                                 
        Job job = new Job(conf, "sum_did_from_log_file");                                                                                                                                                                                                                         
        job.setJarByClass(LogJob.class);                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                  
        job.setMapperClass(org.freebird.mapper.LogMapper.class);                                                                                                                                                                                                                  
        job.setReducerClass(org.freebird.reducer.LogReducer.class);                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                                  
        job.setOutputKeyClass(Text.class);                                                                                                                                                                                                                                        
        job.setOutputValueClass(IntWritable.class);                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                                  
        Path path1 = new Path(inputPath);                                                                                                                                                                                                                                         
        Path path2 = new Path(outputPath);                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                                                  
        recreateFolder(path2, conf);                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                  
        MultipleOutputs.addNamedOutput(job, "result", TextOutputFormat.class, Text.class, IntWritable.class);                                                                                                                                                                     
                                                                                                                                                                                                                                                                                  
        FileInputFormat.addInputPath(job, path1);                                                                                                                                                                                                                                 
        FileOutputFormat.setOutputPath(job, path2);                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                                  
        System.exit(job.waitForCompletion(true) ? 0 : 1);                                                                                                                                                                                                                         
    }                                                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                  
    private static void recreateFolder(Path path, Configuration conf) throws IOException {                                                                                                                                                                                        
        FileSystem fs = path.getFileSystem(conf);                                                                                                                                                                                                                                 
        if (fs.exists(path)) {                                                                                                                                                                                                                                                    
            fs.delete(path);                                                                                                                                                                                                                                                      
        }                                                                                                                                                                                                                                                                         
    }                                                                                                                                                                                                                                                                             
}

Reduce代码也需要修改:

package org.freebird.reducer;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;


public class LogReducer extends Reducer
  
    {

    private MultipleOutputs outputs;

    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        System.out.println("enter LogReducer:::setup method");
        outputs = new MultipleOutputs(context);
    }

    @Override
    public void cleanup(Context conte
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇hdu 4412 Sky Soldiers(区间DP) 下一篇Codeforces 86C Genetic engineer..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·在 Redis 中如何查看 (2025-12-26 03:19:03)
·Redis在实际应用中, (2025-12-26 03:19:01)
·Redis配置中`require (2025-12-26 03:18:58)
·Asus Armoury Crate (2025-12-26 02:52:33)
·WindowsFX (LinuxFX) (2025-12-26 02:52:30)