在hadoop的框架中,刚入门我们维护好Mapper和Reducer两个类就可以实现倒排索引。作为练习可以下载20 Newsgroups数据 :http://qwone.com/~jason/20Newsgroups/。
这些文章是零散的,不适合在hadoop上跑,不过可以整合成一个或几个大文件或者抽出一小部分测试一下。
亲测:没整合,在hadoop上跑所有的文章19997篇,16g内存差点跑爆、、、
因此只是测试,为了省事Mapper、Reducer和Main全写到一个WordCount类里了。
1>worldcount是主要运行程序。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path path = new Path(args[0]);
// FileSystem fs = FileSystem.get(conf); //真分布式
FileSystem fs = path.getFileSystem(conf);//伪分布式
if (fs.exists(path)) {//遍历目录内文件,这里目录内还有一级,参数为目录路径
FileStatus[] fileStatus = fs.listStatus(path);
for (FileStatus fileStatus1 : fileStatus) {
FileStatus[] fileStatus2 = fs.listStatus(fileStatus1.getPath());
for (FileStatus fileStatu : fileStatus2) {
// System.out.println(fileStatu.getPath().toString());
FileInputFormat.addInputPath(job, fileStatu.getPath());
}
}
}
fs.close();
// FileInputFormat.addInputPath(job,new Path(args[0])); //单跑文件,参数为文件路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) 0 : 1);
}
注意文件系统的实例化方式,真分布和假分布式不一样,假分布式用真分布式会报错:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:9000/user/hadoop/input3, expected: file:///
相关连接:https://blog.csdn.net/huangjing_whlg/article/details/39341643
2>mapper,我使用了lucene进行分词。StopAnalyzer+PorterStemFilter,进行分词+词干提取。相关包可以在porn.xml依赖。注释的是普通分类器,可以都试一下比较结果。
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private FileSplit split;
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// StringTokenizer itr = new StringTokenizer(value.toString());
// while (itr.hasMoreTokens()) {
// word.set(itr.nextToken());
// context.write(word, one);
// }
try {
Analyzer analyzer = new StopAnalyzer();
TokenStream stream = analyzer.tokenStream("", new StringReader(value.toString()));
CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
stream = new PorterStemFilter(stream);
stream.reset();
split = (FileSplit) context.getInputSplit();
while (stream.incrementToken()) {
word.set(cta.toString());
context.write(word, one);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
结果帮你们比较了,使用普通分词器在左,普通分词+提取词干在右:如下图所见。
3>reduce,起简单计数功能。
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
4>设置参数:
这里跑的是20_Newspapers的一部分数据。下面是原码和porn.xml
WordCount:
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.core.StopAnalyzer;
import org.apache.lucene.analysis.en.PorterStemFilter;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private FileSplit split;
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// StringTokenizer itr = new StringTokenizer(value.toString());
// while (itr.hasMoreTokens()) {
// word.set(itr.nextToken());
// context.write(word, one);
// }
try {
Analyzer analyzer = new StopAnalyzer();
TokenStream stream = analyzer.tokenStream("", new StringReader(value.toString()));
CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
stream = new PorterStemFilter(stream);
stream.reset();
split = (FileSplit) context.getInputSplit();
while (stream.incrementToken()) {
word.set(cta.toString() + " " + split.getPath().getName());
// word.set(cta.toString());//
context.write(word, one);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path path = new Path(args[0]);
// FileSystem fs = FileSystem.get(conf); //真分布式
FileSystem fs = path.getFileSystem(conf);//伪分布式
if (fs.exists(path)) {//遍历目录内文件,这里目录内还有一级,参数为目录路径
FileStatus[] fileStatus = fs.listStatus(path);
for (FileStatus fileStatus1 : fileStatus) {
FileStatus[] fileStatus2 = fs.listStatus(fileStatus1.getPath());
for (FileStatus fileStatu : fileStatus2) {
// System.out.println(fileStatu.getPath().toString());
FileInputFormat.addInputPath(job, fileStatu.getPath());
}
}
}
fs.close();
// FileInputFormat.addInputPath(job,new Path(args[0])); //单跑文件,参数为文件路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) 0 : 1);
}
}
porn.xml:
<xml version="1.0" encoding="UTF-8">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Test.hadoop2</groupId>
<artifactId>HadoopTest2</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-icu</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>1.0.13</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
<outputDirectory>./lib</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
结果:word+docid+tf
至此入门系列结束。俺也是刚入门。哈哈