org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader {
?private FileSplit fileSplit;
?private FSDataInputStream fis;
?private Text key = null;
?private BytesWritable value = null;
?private boolean processed = false;
?@Override
?public void close() throws IOException {
? // TODO Auto-generated method stub
? // fis.close();
?}
?@Override
?public Text getCurrentKey() throws IOException, InterruptedException {
? // TODO Auto-generated method stub
? return this.key;
?}
?@Override
?public BytesWritable getCurrentValue() throws IOException,
? ?InterruptedException {
? // TODO Auto-generated method stub
? return this.value;
?}
?@Override
?public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
? ?throws IOException, InterruptedException {
? fileSplit = (FileSplit) inputSplit;
? Configuration job = tacontext.getConfiguration();
? Path file = fileSplit.getPath();
? FileSystem fs = file.getFileSystem(job);
? fis = fs.open(file);
?}
?@Override
?public boolean nextKeyValue() {
? if (key == null) {
? ?key = new Text();
? }
? if (value == null) {
? ?value = new BytesWritable();
? }
? if (!processed) {
? ?byte[] content = new byte[(int) fileSplit.getLength()];
? ?Path file = fileSplit.getPath();
? ?System.out.println(file.getName());
? ?key.set(file.getName());
? ?try {
? ? IOUtils.readFully(fis, content, 0, content.length);
? ? // value.set(content, 0, content.length);
? ? value.set(new BytesWritable(content));
? ?} catch (IOException e) {
? ? // TODO Auto-generated catch block
? ? e.printStackTrace();
? ?} finally {
? ? IOUtils.closeStream(fis);
? ?}
? ?processed = true;
? ?return true;
? }
? return false;
?}
?@Override
?public float getProgress() throws IOException, InterruptedException {
? // TODO Auto-generated method stub
? return processed ? fileSplit.getLength() : 0;
?}
}