有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的。这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result.
同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除。
先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output.
4.0.0
org.freebird
mr1_example3
jar
1.0-SNAPSHOT
mr1_example3
http://maven.apache.org
org.apache.hadoop
hadoop-core
1.2.1
org.codehaus.mojo
exec-maven-plugin
1.3.2
exec
hadoop
jar
target/mr1_example3-1.0-SNAPSHOT.jar
org.freebird.LogJob
/user/chenshu/share/logs
LogJob.java做了修改,主要用Path, FileSystem和Configuration三个类配合,删除HDFS已经存在的目录。
并且只设置了一个NamedOutput,名为result.
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.freebird.reducer.LogReducer;
import org.freebird.mapper.LogMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
public class LogJob {
public static void main(String[] args) throws Exception {
String inputPath = args[0];
if (inputPath.endsWith("/")) {
inputPath = inputPath.substring(0, inputPath.length() -1);
}
System.out.println("args[0] indicates input folder path, the last / will be removed if it exists:" + inputPath);
String outputPath = inputPath + "/output";
System.out.println("output folder path is:" + outputPath);
Configuration conf = new Configuration();
Job job = new Job(conf, "sum_did_from_log_file");
job.setJarByClass(LogJob.class);
job.setMapperClass(org.freebird.mapper.LogMapper.class);
job.setReducerClass(org.freebird.reducer.LogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path path1 = new Path(inputPath);
Path path2 = new Path(outputPath);
recreateFolder(path2, conf);
MultipleOutputs.addNamedOutput(job, "result", TextOutputFormat.class, Text.class, IntWritable.class);
FileInputFormat.addInputPath(job, path1);
FileOutputFormat.setOutputPath(job, path2);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private static void recreateFolder(Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
fs.delete(path);
}
}
}
Reduce代码也需要修改:
package org.freebird.reducer;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class LogReducer extends Reducer
{
private MultipleOutputs outputs;
@Override
public void setup(Context context) throws IOException, InterruptedException {
System.out.println("enter LogReducer:::setup method");
outputs = new MultipleOutputs(context);
}
@Override
public void cleanup(Context conte