设为首页 加入收藏

TOP

Hadop经典例子WordCount
2018-11-13 14:32:30 】 浏览:58
Tags:Hadop 经典 例子 WordCount
1.配置eclipse的hadoop插件

(1)使用SCP上传eclipse-SDK-4.2.1-linux-gtk-x86_64.tar.gz和hadoop-eclipse-plugin-2.5.1.jar到CentOS中。
这里写图片描述
(2)在CentOS中使用hadoop创建新文件夹:
注意在执行hadoop命令之前要先启动hadoop

cd /home/
hadoop fs -mkdir /data01

这里写图片描述
(3)使用cat和put命令:
这里写图片描述
(4)使用mv命令移动文件到/home/gznc-hadoop下:

mv /home/gznc-hadoop/Desktop/ eclipse-SDK-4.2.1-linux-gtk-x86_64.tar.gz /home/gznc-hadoop/

(5)解压:tar –zxvf eclipse-SDK-4.2.1-linux-gtk-x86_64.tar.gz
(6)拷贝hadoop-eclipse-plugin-2.5.1.jar到/plugins/

cp /home/gznc-hadoop/hadoop-eclipse-plugin.2.5.1.jar /home/gznc-hadoop/eclipse/plugins

(7)启动eclipse: ./eclipse &
这里写图片描述
(8)在eclipse中加载hadoop:windows->preferences->Hadoop Map/Reduce->选择hadoop的根目录
这里写图片描述
(9)创建一个Map/Reduce 项目:
这里写图片描述
(10)机构图如下:
这里写图片描述
WordCountJob类如下:

package con.gznc.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapreduce.*;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
public class WordCountJob {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
        Configuration conf = new Configuration();  
        Job wordCountJob = Job.getInstance(conf);  

        //重要:指定本job所在的jar包  
        wordCountJob.setJarByClass(WordCountJob.class);  

        //设置wordCountJob所用的mapper逻辑类为哪个类  
        wordCountJob.setMapperClass(WordCountMapper.class);  
        //设置wordCountJob所用的reducer逻辑类为哪个类  
        wordCountJob.setReducerClass(WordCountReducer.class);  

        //设置map阶段输出的kv数据类型  
        wordCountJob.setMapOutputKeyClass(Text.class);  
        wordCountJob.setMapOutputValueClass(IntWritable.class);  

        //设置最终输出的kv数据类型  
        wordCountJob.setOutputKeyClass(Text.class);  
        wordCountJob.setOutputValueClass(IntWritable.class);  

        //设置要处理的文本数据所存放的路径  
        FileInputFormat.setInputPaths(wordCountJob, args[0]);  
        FileOutputFormat.setOutputPath(wordCountJob, new Path(args[1]));  

        //提交job给hadoop集群  
        wordCountJob.waitForCompletion(true);  
    }  
}

WordCountMapper类如下:

package con.gznc.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;




/* 
 * KEYIN:输入kv数据对中key的数据类型 
 * VALUEIN:输入kv数据对中value的数据类型 
 * KEYOUT:输出kv数据对中key的数据类型 
 * VALUEOUT:输出kv数据对中value的数据类型 
 */  
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    /* 
     * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法 
     * map task在调用map方法时,传递的参数: 
     *      一行的起始偏移量LongWritable作为key 
     *      一行的文本内容Text作为value 
     */  
    @Override  
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {  
        //拿到一行文本内容,转换成String 类型  
        String line = value.toString();  
        //将这行文本切分成单词  
        String[] words=line.split(" ");  

        //输出<单词,1>  
        for(String word:words){  
            context.write(new Text(word), new IntWritable(1));  
        }  
    }  
}

WordCountReducer类如下:

package con.gznc.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/* 
 * KEYIN:对应mapper阶段输出的key类型 
 * VALUEIN:对应mapper阶段输出的value类型 
 * KEYOUT:reduce处理完之后输出的结果kv对中key的类型 
 * VALUEOUT:reduce处理完之后输出的结果kv对中value的类型 
 */  
public class WordCountReducer  extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override  
    /* 
     * reduce方法提供给reduce task进程来调用 
     *  
     * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组 
     * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法 
     * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1> 
     *  hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理 
     *  调用时传递的参数: 
     *          key:一组kv中的key 
     *          values:一组kv中所有value的迭代器 
     */  
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {  
        //定义一个计数器  
        int count = 0;  
        //通过value这个迭代器,遍历这一组kv中所有的value,进行累加  
        for(IntWritable value:values){  
            count+=value.get();  
        }  

        //输出这个单词的统计结果  
        context.write(key, new IntWritable(count));  
    }  
}

(11)代码编写完成过后导出变为jar包:
这里写图片描述
选择导出项目和路径,路径为/home/hadoop/myjar ,今后的jar包都放这,方便管理
这里写图片描述

(12)准备要进行计算的word.txt文本:
在/home/hadoop/下面新建一个文件夹test_data用来存放以后需要计算的数据
然后新建一个word.txt文档,里面可以随便放一些英文测试数据,用hadoop来数里面的单词数。可以用vim修改txt文档。

mkdir test_data
cd test_data/
touch word.txt
vim word.txt

(13)将要计算的数据上传到hadoop:
先用hadoop命令新建两个文件夹,用来存放,然后使用input上传

hadoop fs -mkdir /user/hadoop

hadoop fs -mkdir /user/hadoop/input
hadoop fs -put ./test_data/word.txt /user/hadoop/input/

(14)运行
使用jar包运行:

hadoop jar ./myjar/HadoopCountWord.jar con.gznc.hadoop.WordCountJob /user/hadoop/input/word.txt /user/hadoop/output/

(15)查看计算结果文件

hadoop fs -cat /user/hadoop/output/part-r-00000

(16)常见报错信息解决办法
报错信息如下:

Exception in thread "main" org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot delete /tmp/hadoop-yarn/staging/root/.staging/job_1524727489800_0001. Name node is in safe mode.
The reported blocks 0 needs additional 33 blocks to reach the threshold 0.9990 of total blocks 33.
The number of live datanodes 0 has reached the minimum number 0. Safe mode will be turned off automatically once the thresholds have been reached.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkNameNodeSafeMode(FSNamesystem.java:1272)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.deleteInternal(FSNamesystem.java:3521)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.deleteInt(FSNamesystem.java:3479)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3463)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:751)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:562)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

    at org.apache.hadoop.ipc.Client.call(Client.java:1411)
    at org.apache.hadoop.ipc.Client.call(Client.java:1364)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
    at com.sun.proxy.$Proxy9.delete(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy9.delete(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:490)
    at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1726)
    at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:588)
    at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:584)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:584)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:443)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
    at con.gznc.hadoop.WordCountJob.main(WordCountJob.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

解决办法:
在hadoop根目录输入如下命令:

[root@master hadoop-2.5.1]# bin/hadoop dfsadmin -safemode leave

这里写图片描述
然后再运行计算命令即可。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop学习笔记 下一篇Hadoop单机(独立)模式的安装与配置

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目