设为首页 加入收藏

TOP

java操作hadoop
2019-02-25 00:30:27 】 浏览:56
Tags:java 操作 hadoop

这里使用hadoop权威指南中max_temperature示例,使用java操作hadoop和c++类似,只是语言和api级别的差异,也需要3个组件:一个继承自Mapper的类,一个继承自Reducer的类,和作业处理的主流程。可以写在一个.java文件里面,也可以写在3个里面,这里写在3个java文件中。java和c++在运行作业的时候比较大的一个差异是,c++将可执行文件和input都放在hdfs,而java只将input放到hdfs,将3个java文件编译成class文件,打成jar包,jar包是不放在hdfs上,而是放在本地,通过配置环境变量指示hadoop找jar包的路径

map类:MaxTemperatureMapper.java,hadoop使用了一套用于网络序列化传输的基本类型,而不是java的内置类型,这些基本类型存放在org.apache.hadoop.io包中,LongWritable,IntWritable,Text分别对应java中的long,interger,string

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/* 公有继承自hadoop的Mapper类 */
/* 4个参数对应输入键,输入值,输出键,输出值,输入键是长整型偏移,输入值是一行文本,输出键是
 年份,输出值是气温 */
/* 使用的类型都是hadoop的类型而不是java内置的,适合网络序列化传输 */
public class MaxTemperatureMapper 
        extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final int MISSING = 999;
    /* 重写map方法 */
    @Override
    public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        /* map输入的key用不着,输入值是文本内容,转换成一行字符 */
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+') {
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        /* 将经过筛选的记录写入context */
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            /* 年份按照Text格式,气温是Int,与参数对应 */
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

reduce类MaxTemperatureReducer.java:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer 
        extends Reducer<Text, IntWritable, Text, IntWritable> {
    /* key是输入的年份,不用处理,values是一个年份对应的气温,自定义规则处理 */
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;

        /* 遍历所有value,找最大值,作为归约器的输出 */
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

执行作业主程序MaxTemperature.java,其中输入路径可以通过多次调用api多次指定,实现多路径输入,输出路径只能有一个

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class MaxTemperature {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature <input path> <out path>");
            System.exit(-1);
        }

        /* 设置作业,用来指导hadoop获取jar包 */
        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("MaxTemperature");
        
        /* 设置input路径和output路径 */
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        /* 指定要使用的mapper和reducer类型 */
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        /* 控制reduce函数的输出类型,要和Reduce类参数匹配 */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true)  0 : 1);
    }
}

编译命令如下,可以看到生成了3个class文件

[root@master chapter2]# javac *.java
[root@master chapter2]# ls -all | grep class
-rw-r--r-- 1 root root 1392 Sep 26 18:09 MaxTemperature.class
-rw-r--r-- 1 root root 1876 Sep 26 18:09 MaxTemperatureMapper.class
-rw-r--r-- 1 root root 1687 Sep 26 18:09 MaxTemperatureReducer.class

生成jar包,命令如下:

[root@master chapter2]# jar cvf MaxTemperature.jar *.class
added manifest
adding: MaxTemperature.class(in = 1392) (out= 792)(deflated 43%)
adding: MaxTemperatureMapper.class(in = 1876) (out= 805)(deflated 57%)
adding: MaxTemperatureReducer.class(in = 1687) (out= 717)(deflated 57%)
[root@master chapter2]# ls -all | grep jar
-rwxrwxrwx 1 root root 3078 Sep 26 18:16 MaxTemperature.jar

将input数据上传到hdfs,文本如下:

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

上传命令如下:

[root@master chapter2]# hdfs dfs -put sample.txt /max_temperature

配置环境变量,修改/etc/profile,增加如下内容,并通过命令source /etc/profile生效:

export HADOOP_CLASSPATH=Maxtemperature.jar

执行作业命令,注意进入MaxTemperature.jar所在目录,否则需要提供绝对路径,后面的参数MaxTemperature必须和作业执行的class文件名一致,最后两个参数是input所在路径和output路径

[root@master chapter2]# hadoop jar MaxTemperature.jar MaxTemperature /max_temperature/sample.txt output

执行结果如下,输出年和对应年的气温最高值

[root@master chapter2]# hdfs dfs -cat output/*
1949	111
1950	22

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop权威指南中文第四版(高清) 下一篇结合案例讲解MapReduce重要知识点..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目