设为首页 加入收藏

TOP

Hadoop之Reduce侧的联结(二)
2015-11-21 01:37:22 来源: 作者: 【 】 浏览:1
Tags:Hadoop Reduce 联结
签的TaggedWritable @Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv=new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class Reduce extends DataJoinReducerBase{ // 两个参数数组大小一定相同,并且最多等于数据源个数 @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { if(tags.length<2){ return null;// 这一步,实现内联结 } String joinedStr=""; for(int i=0;i 0){ joinedStr+=",";// 以逗号作为原两个数据源记录链接的分割符 TaggedWritable tw=(TaggedWritable)values[i]; String line=((Text)tw.getData()).toString(); String[] tokens=line.split(",",2);// 将一条记录划分两组,去掉第一组的组键名。 joinedStr+=tokens[1]; } } TaggedWritable retv=new TaggedWritable(new Text(joinedStr)); retv.setTag((Text)tags[0]); return retv;// 这只retv的组键,作为最终输出键。 } } /*TaggedMapOutput是一个抽象数据类型,封装了标签与记录内容 此处作为DataJoinMapperBase的输出值类型,需要实现Writable接口,所以要实现两个序列化方法 自定义输入类型*/ public static class TaggedWritable extends TaggedMapOutput{ private Writable data; //如果不给其一个默认的构造方法,Hadoop的使用反射来创建这个对象,需要一个默认的构造函数(无参数) public TaggedWritable(){ } public TaggedWritable(Writable data){ //TODO 这里可以通过setTag()方法进行设置 this.tag=new Text(""); this.data=data; } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); //加入以下的代码.避免出现空指针异常,当时一定要在其写的时候加入out.writeUTF(this.data.getClass().getName()); //不然会出现readFully错误 String temp=in.readUTF(); if(this.data==null||!this.data.getClass().getName().equals(temp)){ try { this.data=(Writable)ReflectionUtils.newInstance(Class.forName(temp), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } @Override public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public Writable getData() { return data; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 组件配置是由Hadoop的Configuration的一个实例实现 JobConf job = new JobConf(conf, DataJoin.class); Path in=new Path("hdfs://master:9000/user/input/yfl/*.txt"); Path out=new Path("hdfs://master:9000/user/output/testfeng1"); FileSystem fs=FileSystem.get(conf); //通过其命令来删除输出目录 if(fs.exists(out)){ fs.delete(out,true); } //TODO这里注意别导错包了 FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); } }

运行的结果:这里写图片描述

为了让调试更加的方便,在程序中直接使用delete命令已达到删除输出目录的功能,省去每次都要手动删除的麻烦,这里需要在我们的工程目录下面的bin目录下面添加主机的core-site.xml和hdfs-site.xml文件,然后给对于的目录赋上权限chmod -R 777 xxx,即可。
这里写图片描述
hadoop很有意思,我希望自己能走的更远!!!坚持,加油!!!

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇事务的ACID 下一篇数据库操作--创建索引

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: