设为首页 加入收藏

TOP

如何通过Java程序提交yarn的MapReduce计算任务(一)
2015-02-02 14:50:48 来源: 作者: 【 】 浏览:54
Tags:如何 通过 Java 程序 提交 yarn MapReduce 计算 任务

由于项目需求,需要通过Java程序提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码。


以下为MapReduce主程序,有几点需要提一下:


1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。


2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的不同,这里变为了,TextPair的格式为


3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用。


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;


import util.Utils;


public class GEMIMain {
?
?public GEMIMain(){
? job = null;
?}
?
?public Job job;
?public static class NamePartitioner extends
? ?Partitioner {
? @Override
? public int getPartition(TextPair key, BytesWritable value,
? ? int numPartitions) {
? ?return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
? }
?}


?/**
? * 分组设置类,只要两个TextPair的第一个key相同,他们就属于同一组。他们的Value就放到一个Value迭代器中,
? * 然后进入Reducer的reduce方法中。
? *
? * @author hduser
? *
? */
?public static class GroupComparator extends WritableComparator {
? public GroupComparator() {
? ?super(TextPair.class, true);
? }


? @Override
? public int compare(WritableComparable a, WritableComparable b) {
? ?TextPair t1 = (TextPair) a;
? ?TextPair t2 = (TextPair) b;
? ?// 比较相同则返回0,比较不同则返回-1
? ?return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一个字段相同的就分成为同一组
? }
?}
?
?
?public? boolean runJob(String[] args) throws IOException,
? ?ClassNotFoundException, InterruptedException {
?
? Configuration conf = new Configuration();
? // 在conf中设置outputath变量,以在reduce函数中可以获取到该参数的值
? conf.set("outputPath", args[args.length - 1].toString());
? //设置HDFS中,每次任务生成产品的质量文件所在文件夹。args数组的倒数第二个原数为质量文件所在文件夹
? conf.set("qualityFolder", args[args.length - 2].toString());
? //如果在Server中运行,则需要获取web项目的根路径;如果以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件
? //MapReduceProgress mprogress = new MapReduceProgress();
? //String rootPath= mprogress.rootPath;
? String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
? conf.addResource(new Path(rootPath+"yarn-site.xml"));
? conf.addResource(new Path(rootPath+"core-site.xml"));
? conf.addResource(new Path(rootPath+"hdfs-site.xml"));
? conf.addResource(new Path(rootPath+"mapred-site.xml"));
? this.job = new Job(conf);
?
? job.setJobName("Job name:" + args[0]);
? job.setJarByClass(GEMIMain.class);


? job.setMapperClass(GEMIMapper.class);
? job.setMapOutputKeyClass(TextPair.class);
? job.setMapOutputValueClass(BytesWritable.class);
? // 设置partition
? job.setPartitionerClass(NamePartitioner.class);
? // 在分区之后按照指定的条件分组
? job.setGroupingComparatorClass(GroupComparator.class);


? job.setReducerClass(GEMIReducer.class);


? job.setInputFormatClass(WholeFileInputFormat.class);
? job.setOutputFormatClass(NullOutputFormat.cla

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇用R画有图例的中国地图 下一篇QT使用QCustomPlot 绘制柱状图

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: