设为首页 加入收藏

TOP

Hadoop 分区案例(根据不同的值分到不同文件)
2019-04-14 12:44:09 】 浏览:15
Tags:Hadoop 分区 案例 根据 不同 文件

主程序代码

package com.mapreduce;

import java.io.IOException;

import javax.imageio.stream.FileImageInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mapreduce.ModelDemo.MyMapper;
import com.mapreduce.ModelDemo.MyReduce;

public class PartitonerDemo implements Tool {

	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			// 1、从输入数据中获取每一个文件中的每一行的值
			String line = value.toString();
			// 2、对每一行的数据进行切分(看情况)
			String[] words = line.split(" ");
			// 3、循环处理
			for (String word : words) {
				value.set(1+"");
				// map阶段的输出
				context.write(new Text(word), value);
			}
		}

	}

	public static class MyReduce extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text value, Iterable<Text> list, Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (Text i : list) {
				count += Integer.parseInt(i.toString());
			}
			context.write(value, new Text(String.valueOf(count)));
		}

	}

	public static void main(String[] args) throws Exception {
		int isok = ToolRunner.run(new PartitonerDemo(), args);
		// 退出整个job
		System.exit(isok);
		
	}

	public void setConf(Configuration conf) {
		// TODO Auto-generated method stub
			conf.set("fs.defaultFS", "hdfs://zwj");
			conf.set("dfs.nameservices", "zwj");
			conf.set("dfs.ha.namenodes.zwj", "nn1,nn2");
			conf.set("dfs.namenode.rpc-address.zwj.nn1","hadoop01:9000");
			conf.set("dfs.namenode.rpc-address.zwj.nn2","hadoop02:9000");
			conf.set("dfs.client.failover.proxy.provider.zwj", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
	}

	public Configuration getConf() {
		// TODO Auto-generated method stub
		return new Configuration();
	}

	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = getConf();
		Job job = Job.getInstance(conf,"job");
		job.setJarByClass(PartitonerDemo.class);
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		//设置Partitioner的Class -> 值是自定义MyPartitioner.class MyPartitioner需要extendsPartitioner<Text, Text>
		job.setPartitionerClass(MyPartitioner.class);
		job.setNumReduceTasks(4);
		
		job.setReducerClass(MyReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		setInputAndOutput(job, conf, args);
		return  (job.waitForCompletion(true) 0 : 1);
	}

	private void setInputAndOutput(Job job, Configuration conf, String[] args) throws Exception{
		if(args.length!=2) {
			System.out.println("数据格式不正确");
			return ;
		}
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileSystem fs = FileSystem.get(conf);
		Path outPath = new Path(args[1]);
		if(fs.exists(outPath)) {
			fs.delete(outPath, true);
		}
		FileOutputFormat.setOutputPath(job, outPath);
	}

}

分区类的代码

package com.mapreduce;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Text, Text> {
	
	/**
	 * key 代表 reduce时输出的key   value代表reduce最终输出的value
	 */
	@Override
	public int getPartition(Text key, Text value, int numPartitions) {
		// TODO Auto-generated method stub
		if (key.toString().length() > 0) {
			String firstChar = key.toString().substring(0, 1);
			if (firstChar.matches("^[A-Z]")) {
				return 0 % numPartitions;
			} else if (firstChar.matches("^[a-z]")) {
				return 1 % numPartitions;
			} else if (firstChar.matches("^[0-9]")) {
				return 2 % numPartitions;
			} else {
				return 3 % numPartitions;
			}
		} else {
			return 3 % numPartitions;
		}
	}

}


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop编译环境组件搭建 下一篇认识的误区:Hadoop=云计算

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }