设为首页 加入收藏

TOP

如何通过Java程序提交yarn的MapReduce计算任务(三)
2015-02-02 14:50:48 来源: 作者: 【 】 浏览:52
Tags:如何 通过 Java 程序 提交 yarn MapReduce 计算 任务
org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


public class WholeFileRecordReader extends RecordReader {


?private FileSplit fileSplit;
?private FSDataInputStream fis;


?private Text key = null;
?private BytesWritable value = null;


?private boolean processed = false;


?@Override
?public void close() throws IOException {
? // TODO Auto-generated method stub
? // fis.close();
?}


?@Override
?public Text getCurrentKey() throws IOException, InterruptedException {
? // TODO Auto-generated method stub
? return this.key;
?}


?@Override
?public BytesWritable getCurrentValue() throws IOException,
? ?InterruptedException {
? // TODO Auto-generated method stub
? return this.value;
?}


?@Override
?public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
? ?throws IOException, InterruptedException {


? fileSplit = (FileSplit) inputSplit;
? Configuration job = tacontext.getConfiguration();
? Path file = fileSplit.getPath();
? FileSystem fs = file.getFileSystem(job);
? fis = fs.open(file);
?}


?@Override
?public boolean nextKeyValue() {


? if (key == null) {
? ?key = new Text();
? }


? if (value == null) {
? ?value = new BytesWritable();
? }


? if (!processed) {
? ?byte[] content = new byte[(int) fileSplit.getLength()];


? ?Path file = fileSplit.getPath();


? ?System.out.println(file.getName());
? ?key.set(file.getName());


? ?try {
? ? IOUtils.readFully(fis, content, 0, content.length);
? ? // value.set(content, 0, content.length);
? ? value.set(new BytesWritable(content));
? ?} catch (IOException e) {
? ? // TODO Auto-generated catch block
? ? e.printStackTrace();
? ?} finally {
? ? IOUtils.closeStream(fis);
? ?}


? ?processed = true;
? ?return true;
? }


? return false;
?}


?@Override
?public float getProgress() throws IOException, InterruptedException {
? // TODO Auto-generated method stub
? return processed ? fileSplit.getLength() : 0;
?}


}


首页 上一页 1 2 3 下一页 尾页 3/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇用R画有图例的中国地图 下一篇QT使用QCustomPlot 绘制柱状图

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: