设为首页 加入收藏

TOP

如何通过Java程序提交yarn的MapReduce计算任务(二)
2015-02-02 14:50:48 来源: 作者: 【 】 浏览:53
Tags:如何 通过 Java 程序 提交 yarn MapReduce 计算 任务
ss);
? // job.setOutputKeyClass(NullWritable.class);
? // job.setOutputValueClass(Text.class);
? job.setNumReduceTasks(8);
?
?
? // 设置计算输入数据的路径
? for (int i = 1; i < args.length - 2; i++) {
? ?FileInputFormat.addInputPath(job, new Path(args[i]));
? }
? // args数组的最后一个元素为输出路径
? FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
? boolean flag = job.waitForCompletion(true);
? return flag;
?}
?
?@SuppressWarnings("static-access")
?public static void main(String[] args) throws ClassNotFoundException,
? ?IOException, InterruptedException {?
?
? String[] inputPaths = new String[] { "normalizeJob",
? ? "hdfs://192.168.168.101:9000/user/hduser/red1/",
? ? "hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
? ? "hdfs://192.168.168.101:9000/user/hduser/test" };
? GEMIMain test = new GEMIMain();
? boolean result = test.runJob(inputPaths);? ? ? ?
?}
}


以下为TextPair类


public class TextPair implements WritableComparable {
?private Text first;
?private Text second;


?public TextPair() {
? set(new Text(), new Text());
?}


?public TextPair(String first, String second) {
? set(new Text(first), new Text(second));
?}


?public TextPair(Text first, Text second) {
? set(first, second);
?}


?public void set(Text first, Text second) {
? this.first = first;
? this.second = second;
?}


?public Text getFirst() {
? return first;
?}


?public Text getSecond() {
? return second;
?}


?@Override
?public void write(DataOutput out) throws IOException {
? first.write(out);
? second.write(out);
?}


?@Override
?public void readFields(DataInput in) throws IOException {
? first.readFields(in);
? second.readFields(in);
?}


?@Override
?public int hashCode() {
? return first.hashCode() * 163 + second.hashCode();
?}


?@Override
?public boolean equals(Object o) {
? if (o instanceof TextPair) {
? ?TextPair tp = (TextPair) o;
? ?return first.equals(tp.first) && second.equals(tp.second);
? }
? return false;
?}


?@Override
?public String toString() {
? return first + "\t" + second;
?}
?
?@Override
?/**A.compareTo(B)
? * 如果比较相同,则比较结果为0
? * 如果A大于B,则比较结果为1
? * 如果A小于B,则比较结果为-1
? *
? */
?public int compareTo(TextPair tp) {
? int cmp = first.compareTo(tp.first);
? if (cmp != 0) {
? ?return cmp;
? }
? //此时实现的是升序排列
? return second.compareTo(tp.second);
?}
}


以下为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分


package web.hadoop;


import java.io.IOException;?


import org.apache.hadoop.fs.Path;?
import org.apache.hadoop.io.BytesWritable;?
import org.apache.hadoop.io.Text;?
import org.apache.hadoop.mapreduce.InputSplit;?
import org.apache.hadoop.mapreduce.JobContext;?
import org.apache.hadoop.mapreduce.RecordReader;?
import org.apache.hadoop.mapreduce.TaskAttemptContext;?
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class WholeFileInputFormat extends FileInputFormat {?
?
? ? @Override?
? ? public RecordReader createRecordReader(?
? ? ? ? ? ? InputSplit arg0, TaskAttemptContext arg1) throws IOException,?
? ? ? ? ? ? InterruptedException {?
? ? ? ? // TODO Auto-generated method stub?
? ? ? ? return new WholeFileRecordReader();?
? ? }?
?
? ? @Override?
? ? protected boolean isSplitable(JobContext context, Path filename) {?
? ? ? ? // TODO Auto-generated method stub?
? ? ? ? return false;?
? ? }?
}?


以下为WholeFileRecordReader类


package web.hadoop;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import

首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇用R画有图例的中国地图 下一篇QT使用QCustomPlot 绘制柱状图

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: