实现mapreduce多文件自定义输出(二)

2014-11-24 08:46:47 · 作者: · 浏览: 1
public class MultiRecordWriter extends RecordWriter {
/** RecordWriter的缓存 */
private HashMap> recordWriters = null;
private TaskAttemptContext job = null;
/** 输出目录 */
private Path workPath = null;


public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap>();
}


@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator> values = this.recordWriters.values().iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
}


@Override
public void write(K key, V value) throws IOException, InterruptedException {
// 得到输出文件名
String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
//如果recordWriters里没有文件名,那么就建立。否则就直接写值。
RecordWriter rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
rw.write(key, value);
}


// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
private RecordWriter getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException,
InterruptedException {
Configuration conf = job.getConfiguration();
//查看是否使用解码器
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = ",";
RecordWriter recordWriter = null;
if (isCompressed) {
Class< extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
//这里我使用的自定义的OutputFormat
recordWriter = new LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)),
keyValueSeparator);
} else {
Path file = new Path(workPath, baseName);
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
//这里我使用的自定义的OutputFormat
recordWriter = new LineRecordWriter(fileOut, keyValueSeparator);
}
return recordWriter;
}
}


}
接着你还需要自定义一个LineRecordWriter实现记录写入器RecordWriter类,自定义输出格式。
[java]
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
*
* 重新构造实现记录写入器RecordWriter类
* Created on 2012-07-08
* @author zhoulongliu
* @param
* @param
*/
public class LineRecordWriter ext