实现mapreduce多文件自定义输出(三)

2014-11-24 08:46:47 · 作者: · 浏览: 2
ends RecordWriter {

private static final String utf8 = "UTF-8";//定义字符编码格式
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);//定义换行符
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;

//实现构造方法,出入输出流对象和分隔符
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}

public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}

private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}

/**
* 将mapreduce的key,value以自定义格式写入到输出流中
*/
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}

public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}

}
接着,你实现刚刚重写MultipleOutputFormat类中的generateFileNameForKeyValue方法自定义返回需要输出文件的名称,我这里是以key值中以逗号分割取第一个字段的值作为输出文件名,这样第一个字段值相同的会输出到一个文件中并以其值作为文件名。 www.2cto.com
[java]
public static class VVLogNameMultipleTextOutputFormat extends MultipleOutputFormat {

@Override
protected String generateFileNameForKeyValue(Text key, NullWritable value, Configuration conf) {
String sp[] = key.toString().split(",");
String filename = sp[1];
try {
Long.parseLong(sp[1]);
} catch (NumberFormatException e) {
filename = "000000000000";
}
return filename;
}


}


最后就是在job调用时设置了
Configuration conf = getConf();
Job job = new Job(conf);
job.setNumReduceTasks(12);
......
job.setMapperClass(VVEtlMapper.class);
job.setReducerClass(EtlReducer.class);
job.setOutputFormatClass(VVLogNameMultipleTextOutputFormat.class);//设置自定义的多文件输出类
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
FileOutputFormat.setCompressOutput(job, true);//设置输出结果采用压缩
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class); //设置输出结果采用lzo压缩
ok,这样你就完成了支持新的hadoop api自定义的多文件输出mapreduce编写。


作者:liuzhoulong