这里使用hadoop权威指南中max_temperature示例,使用java操作hadoop和c++类似,只是语言和api级别的差异,也需要3个组件:一个继承自Mapper的类,一个继承自Reducer的类,和作业处理的主流程。可以写在一个.java文件里面,也可以写在3个里面,这里写在3个java文件中。java和c++在运行作业的时候比较大的一个差异是,c++将可执行文件和input都放在hdfs,而java只将input放到hdfs,将3个java文件编译成class文件,打成jar包,jar包是不放在hdfs上,而是放在本地,通过配置环境变量指示hadoop找jar包的路径
map类:MaxTemperatureMapper.java,hadoop使用了一套用于网络序列化传输的基本类型,而不是java的内置类型,这些基本类型存放在org.apache.hadoop.io包中,LongWritable,IntWritable,Text分别对应java中的long,interger,string
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/* 公有继承自hadoop的Mapper类 */
/* 4个参数对应输入键,输入值,输出键,输出值,输入键是长整型偏移,输入值是一行文本,输出键是
年份,输出值是气温 */
/* 使用的类型都是hadoop的类型而不是java内置的,适合网络序列化传输 */
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 999;
/* 重写map方法 */
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/* map输入的key用不着,输入值是文本内容,转换成一行字符 */
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
/* 将经过筛选的记录写入context */
if (airTemperature != MISSING && quality.matches("[01459]")) {
/* 年份按照Text格式,气温是Int,与参数对应 */
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
reduce类MaxTemperatureReducer.java:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
/* key是输入的年份,不用处理,values是一个年份对应的气温,自定义规则处理 */
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
/* 遍历所有value,找最大值,作为归约器的输出 */
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
执行作业主程序MaxTemperature.java,其中输入路径可以通过多次调用api多次指定,实现多路径输入,输出路径只能有一个
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <out path>");
System.exit(-1);
}
/* 设置作业,用来指导hadoop获取jar包 */
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("MaxTemperature");
/* 设置input路径和output路径 */
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/* 指定要使用的mapper和reducer类型 */
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
/* 控制reduce函数的输出类型,要和Reduce类参数匹配 */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) 0 : 1);
}
}
编译命令如下,可以看到生成了3个class文件
[root@master chapter2]# javac *.java
[root@master chapter2]# ls -all | grep class
-rw-r--r-- 1 root root 1392 Sep 26 18:09 MaxTemperature.class
-rw-r--r-- 1 root root 1876 Sep 26 18:09 MaxTemperatureMapper.class
-rw-r--r-- 1 root root 1687 Sep 26 18:09 MaxTemperatureReducer.class
生成jar包,命令如下:
[root@master chapter2]# jar cvf MaxTemperature.jar *.class
added manifest
adding: MaxTemperature.class(in = 1392) (out= 792)(deflated 43%)
adding: MaxTemperatureMapper.class(in = 1876) (out= 805)(deflated 57%)
adding: MaxTemperatureReducer.class(in = 1687) (out= 717)(deflated 57%)
[root@master chapter2]# ls -all | grep jar
-rwxrwxrwx 1 root root 3078 Sep 26 18:16 MaxTemperature.jar
将input数据上传到hdfs,文本如下:
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
上传命令如下:
[root@master chapter2]# hdfs dfs -put sample.txt /max_temperature
配置环境变量,修改/etc/profile,增加如下内容,并通过命令source /etc/profile生效:
export HADOOP_CLASSPATH=Maxtemperature.jar
执行作业命令,注意进入MaxTemperature.jar所在目录,否则需要提供绝对路径,后面的参数MaxTemperature必须和作业执行的class文件名一致,最后两个参数是input所在路径和output路径
[root@master chapter2]# hadoop jar MaxTemperature.jar MaxTemperature /max_temperature/sample.txt output
执行结果如下,输出年和对应年的气温最高值
[root@master chapter2]# hdfs dfs -cat output/*
1949 111
1950 22