设为首页 加入收藏

TOP

在Hadoop中处理输入的CSV文件
2019-02-09 00:42:59 】 浏览:138
Tags:Hadoop 处理 输入 CSV 文件


No Reply , Posted in Hadoop on December 2, 2012

在Hadoop中,InputFormat类用来生成可供Mapper处理的<key, value>键值对。当数据传送给Mapper时,Mapper会将输入分片传送到InputFormat上,InputFormat调用getRecordReader()方法生成RecordReader,RecordReader再创建可供map函数处理的键值对<K1, V1>。

Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的键值对。比如,TextInputFormat,Hadoop中默认的输入方法,会将每行数据生成一条记录,其中key值为每条记录在分片中的字节偏移量,value则为每行的内容。

在Hadoop预定义的InputFormat中,并没有处理CSV文件的方法。CSV文件的本质其实是用逗号分隔开的文本文件。一种很直观的处理方法是:将CSV文件作为文本文件处理,使用TextInputFormat将文件按行传入map函数,在map函数中再按照CSV文件的格式进行处理。但这样很容易将数据格式的处理逻辑与业务处理逻辑混淆在一起,并且出现很多copy-and-pasted的代码。

实际上,可以写一个自己的InputFormat以及RecordReader类,专门用来处理CSV文件的输入,直接传递给map函数解析后的数据。


1 数据结构


我们传递给map函数一个ArrayWritable(A Writable for arrays containing instances of a class),元素类型为Text,即CSV文件每一行各个字段的数据。数据结构如下:

代码1:TextArrayWritable.java


  1. public class TextArrayWritable extends ArrayWritable {
  2. public TextArrayWritable() {
  3. super(Text.class);
  4. }
  5. public TextArrayWritable(Text[] strings) {
  6. super(Text.class, strings);
  7. }
  8. }

2 CSVInputFormat


FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类。它提供了两个功能:一是定义哪些文件包含在一个作业的输入中,另一个是为输入文件生成分片(Input Splits)。而把分片分割成记录的事情交由其子类来完成。所以CSVInputFormat类的实现上,同样是继承InputFormat类,并只需要简单的重写createRecordReader和isSplitable即可。

代码2:CSVInputFormat.java


  1. public class CSVInputFormat
  2. extends FileInputFormat<LongWritable, TextArrayWritable>{
  3. public static final String CSV_TOKEN_SEPARATOR_CONFIG
  4. = "csvinputformat.token.delimiter";
  5. @Override
  6. protected boolean isSplitable(JobContext context, Path filename) {
  7. CompressionCodec codec =
  8. new CompressionCodecFactory(context.getConfiguration())
  9. .getCodec(filename);
  10. return codec == null;
  11. }
  12. @Override
  13. public RecordReader<LongWritable, TextArrayWritable> createRecordReader(
  14. InputSplit split, TaskAttemptContext context)
  15. throws IOException, InterruptedException {
  16. String csvDelimiter = context.getConfiguration()
  17. .get(CSV_TOKEN_SEPARATOR_CONFIG);
  18. Character separator = null;
  19. if (csvDelimiter != null && csvDelimiter.length() == 1) {
  20. separator = csvDelimiter.charAt(0);
  21. }
  22. return new CSVRecordReader(separator);
  23. }
  24. }

其中csvinputformat.token.delimiter是可在配置文件中配置的CSV输入文件分隔符,createRecordReader完成的工作只是从配置文件中得到分隔符,调用真正对CSV文件分片进行处理,并生成键值对的CSVRecordReader函数,并返回RecordReader对象。


3CSVRecordReader


对于CSVRecordReader,要实现的功能无非就是将CSV文件中每一行的各字段提取出来,并将各字段作为TextArrayWritable类型的数据结构传递给map函数。

在Hadoop中有一个LineRecordReader类,它将文本文件每一行的内容作为值返回,类型为Text。所以可以直接在CSVRecordReader中使用LineRecordReader,将LineRecordReader返回的每一行再次进行处理。在CSV文件的处理上,这里用到了OpenCSV对CSV文件的每一行进行解析,具体可参见这里。

下面是CSVRecordReader的实现代码。除了CSV文件的解析、nextKeyValue()方法和getCurrentValue()方法外,大部分方法都直接调用LineRecordReader实例的相应方法。毕竟我们是踩在巨人的肩膀上继续前进嘛。O(∩_∩)O~

代码3:CSVRecordReader.java


  1. public class CSVRecordReader
  2. extends RecordReader<LongWritable, TextArrayWritable> {
  3. private LineRecordReader lineReader;
  4. private TextArrayWritable value;
  5. private CSVParser parser;
  6. // 新建CSVParser实例,用来解析每一行CSV文件的每一行
  7. public CSVRecordReader(Character delimiter) {
  8. this.lineReader = new LineRecordReader();
  9. if (delimiter == null) {
  10. this.parser = new CSVParser();
  11. }
  12. else {
  13. this.parser = new CSVParser(delimiter);
  14. }
  15. }
  16. // 调用LineRecordReader的初始化方法,寻找分片的开始位置
  17. @Override
  18. public void initialize(InputSplit split, TaskAttemptContext context)
  19. throws IOException, InterruptedException {
  20. lineReader.initialize(split, context);
  21. }
  22. // 使用LineRecordReader来得到下一条记录(即下一行)。
  23. // 如果到了分片(Input Split)的尾部,nextKeyValue将返回NULL
  24. @Override
  25. public boolean nextKeyValue()
  26. throws IOException, InterruptedException {
  27. if (lineReader.nextKeyValue()) {
  28. //如果有新记录,则进行处理
  29. loadCSV();
  30. return true;
  31. }
  32. else {
  33. value = null;
  34. return false;
  35. }
  36. }
  37. @Override
  38. public LongWritable getCurrentKey() throws IOException,
  39. InterruptedException {
  40. return lineReader.getCurrentKey();
  41. }
  42. @Override
  43. public TextArrayWritable getCurrentValue() throws IOException,
  44. InterruptedException {
  45. return value;
  46. }
  47. @Override
  48. public float getProgress() throws IOException, InterruptedException {
  49. return lineReader.getProgress();
  50. }
  51. @Override
  52. public void close() throws IOException {
  53. lineReader.close();
  54. }
  55. // 对CSV文件的每一行进行处理
  56. private void loadCSV() throws IOException {
  57. String line = lineReader.getCurrentValue().toString();
  58. // 通过OpenCSV将解析每一行的各字段
  59. String[] tokens = parser.parseLine(line);
  60. value = new TextArrayWritable(convert(tokens));
  61. }
  62. // 将字符串数组批量处理为Text数组
  63. private Text[] convert(String[] tokens) {
  64. Text[] t = new Text[tokens.length];
  65. for (int i = 0; i < t.length; i++) {
  66. t[i] = new Text(tokens[i]);
  67. }
  68. return t;
  69. }
  70. }

4简单的应用


用于处理CSV文件输入的InputFormat已经写完了,现在构造一个简单的应用场景,来试验下这个CSVInputFormat。

假设有这样一些数据,每一列第一个字段为一个标识,后面为随机产生的数字,标识各不相同,求每一行标识后的数字之和并输出,输出格式为:每一行为标识和数字和。

由于标识没有重复,并且逻辑比较简单,这里只写一个Mapper即可,不需要Reducer。

代码4:CSVMapper.java


  1. public class CSVMapper
  2. extends Mapper<LongWritable, TextArrayWritable, Text, IntWritable> {
  3. @Override
  4. protected void map(LongWritable key, TextArrayWritable value, Context context)
  5. throws IOException, InterruptedException {
  6. String[] values = value.toStrings();
  7. int sum = 0;
  8. Text resultKey = new Text(values[0]);
  9. for (int i = 1; i < values.length; i++) {
  10. sum = sum + Integer.valueOf(values[i].trim());
  11. }
  12. IntWritable resultValue = new IntWritable(sum);
  13. context.write(resultKey, resultValue);
  14. }
  15. }

在作业的提交部分,由于没有Reducer,所以将ReduceTask设置为了0

代码5:JustRun.java


  1. public class JustRun extends Configured implements Tool{
  2. @Override
  3. public int run(String[] args) throws Exception {
  4. Configuration conf = new Configuration();
  5. Job job = new Job(conf);
  6. job.setJobName("CSVTest");
  7. job.setJarByClass(JustRun.class);
  8. job.setMapperClass(CSVMapper.class);
  9. job.setOutputKeyClass(Text.class);
  10. job.setOutputValueClass(IntWritable.class);
  11. job.setInputFormatClass(CSVInputFormat.class);
  12. job.setNumReduceTasks(0);
  13. FileInputFormat.setInputPaths(job, new Path(args[0]));
  14. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  15. return job.waitForCompletion(true) 0 : 1;
  16. }
  17. public static void main(String[] args) throws Exception {
  18. int ret = ToolRunner.run(new JustRun(), args);
  19. System.exit(ret);
  20. }
  21. }

执行完毕后,输出如下,跟预想是一致的。

好了,这就是利用InputFormat对CSV文件的处理过程。除了CSV文件,还可根据处理数据的类型,写出更多的InputFormat。同时,我们还可以利用OutputFormat输出需要的格式。


转自

http://bukp.me/hadoop/work-with-csv-input-file-in-hadoop.html

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop 2.7.3 源码分析(三):ha.. 下一篇flume安装在hadoop记录

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目