理解其就像关系型数据库中的链接查询一样,数据很多的时候,几个数据文件的数据能够彼此有联系,可以使用Reduce联结。举个很简单的例子来说,一个只存放了顾客信息Customer.txt文件,和一个顾客相关联的Order.txt文件,要进行两个文件的信息组合,原理图如下:


这里涉及的几个专业术语:Group key ,datasourde,Tag.前者的话通俗点来说的话就相当于关系型数据库中的主键和外键,通过其id进行的联结依据。datasource,顾名思义,就是数据的来源,那么这里指的就是Custonmers和Orders,Tag的话也比较好理解,就是里面的字段到底是属于哪个文件的。
操作Reduce的侧联结,要用到hadoZ??http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vcC1kYXRham9pbi0yLjYuMC5qYXKw/CzErMjPwre+tjo8YnIgLz4NCkU6XGhhZG9vcC0yLjYuMFxzaGFyZVxoYWRvb3BcdG9vbHNcbGliKGhhZG9vcLXEuaTX98S/wrwpoaM8YnIgLz4NCjxzdHJvbmc+08O1vbXEM7j2wOCjujwvc3Ryb25nPjxiciAvPg0KMaGiRGF0YUpvaW5NYXBwZXJCYXNlPGJyIC8+DQoyoaJEYXRhSm9pblJlZHVjZXJCYXNlPGJyIC8+DQozoaJUYWdnZWRNYXBPdXRwdXQ8YnIgLz4NCjxzdHJvbmc+sci9z9X9yr21xLmk1/fUrcDtOjwvc3Ryb25nPjxiciAvPg0KMaGibWFwcGVytsvK5MjruvOjrL2ryv2+3bfi17CzyVRhZ2dlZE1hcE91dHB1dMDg0M2jrLTLwODQzbfi17DK/b7d1LQodGFnKbrN1rUodmFsdWUpo7s8YnIgLz4NCjKhom1hcL3Xts7K5LP2tcS94bn7srvU2srHvPK1pbXE0rvM9cr9vt2jrLb4ysfSu8z1vMfCvKGjvMfCvD3K/b7d1LQodGFnKSvK/b7d1rUodmFsdWUpLjxiciAvPg0KM6GiY29tYmluZb3TytW1xMrH0ru49tfpus+jurK7zazK/b7d1LTItNPQz+DNrNfpvPy1xNa1o7s8YnIgLz4NCjShorK7zazK/b7d1LS1xMO/0rvM9bzHwrzWu8Tc1NrSu7j2Y29tYmluZdbQs/bP1qO7PGJyIC8+DQq6wyzBy73iwcvV4tCpztLDx77NvfjQ0LHgwuu917bOOjxiciAvPg0K1eLA77XEu7C9q7y4uPbA4NC01NrSu8bwsuLK1KOsuNC+9cHt09DSu7esuNC+9aO6PGJyIC8+DQrBqr3h1q7HsLXEQ3VzdG1vbmVyLnR4dM7EvP46PGJyIC8+DQo8aW1nIGFsdD0="这里写图片描述" src="https://www.cppentry.com/upload_files/article/57/1_83ncn__.png" title="\" />
联结之前的Order.txt文件:

测试代码:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DataJoin extends Configuration{
//DataJoinMapperBase默认没导入,路径E:\hadoop-2.6.0\share\hadoop\tools\lib
public static class MapClass extends DataJoinMapperBase{
// 设置组键
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line=((Text)aRecord.getData()).toString();
String [] tokens=line.split(",");
String groupkey=tokens[0];
return new Text(groupkey);
}
/*
* 这个在任务开始时调用,用于产生标签
此处就直接以文件名作为标签
*/
@Override
protected Text generateInputTag(String inputFile) {
return new Text(inputFile);
}
// 返回一个任何带任何我们想要的Text标