设为首页 加入收藏

TOP

HDFS中的文件写入到Mysql,通过DBConfiguration,DBOutputFormat
2019-04-18 00:16:06 】 浏览:133
Tags:HDFS 文件 写入 Mysql 通过 DBConfiguration DBOutputFormat
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhoudetiankong/article/details/17027917

将HDFS中的文件,通过DBConfiguration,DBOutputormat,写入mysql数据库中。

1.HDFS中文件格式

name1 1

name2 2

name3 3

2.数据库表test格式

两个字段

String name,int id

3.DBConfiguration配置

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.10:3306/数据库名", "用户名", "密码");

job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "数据库表名", "字段1","字段2");

4.写自定义类DataTable,继承DBWritable,Writable,为了完成数据序列化。

5.经本人测试,用这种方法必须在Reduce阶段的输出key值,将数据写入数据库。否则会报错。本来时想在map阶段的key值就输出的,但是不管用,原因没搞清楚,请各位多多指证。

附代码:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class TextToMysql {

	public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException
	{
		Configuration conf=new Configuration();
		DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.10:3306/samples", "root", "1234");
		
		Job job=new Job(conf,"mysql");
		job.setJarByClass(TextToMysql.class);
		
		job.setMapperClass(map.class);
		job.setReducerClass(reduce.class);
		
		job.setOutputKeyClass(DataTable.class);
		job.setOutputValueClass(Text.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
			
		
		job.setOutputFormatClass(DBOutputFormat.class);
		DBOutputFormat.setOutput(job, "test", "name","id");
		
		
		FileInputFormat.addInputPath(job,new Path("hdfs://192.168.1.245:9000/test"));
		
		job.waitForCompletion(true);
		
	}
	
	public static class map extends Mapper<LongWritable,Text,Text,Text>
	{
		@Override
		protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
			// TODO Auto-generated method stu
			  context.write(value, new Text(""));
		}	
	}
	
	public static class reduce extends Reducer<Text,IntWritable,DataTable,Text>
	{

		//@Override
		protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String args[] = key.toString().split("\t");
			DataTable da = new DataTable();
			da.setName(args[0]);
			da.setId(Integer.parseInt(args[1]));
		    context.write(da, new Text(""));
		}
	}
	
	public static class DataTable implements Writable,DBWritable
	{
       private int id;
       private String name;

       public DataTable(){} 
       
	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

		@Override
		public void write(PreparedStatement statement) throws SQLException {
			// TODO Auto-generated method stub
			statement.setInt(2,this.id);
			statement.setString(1,this.name);
		}

		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			// TODO Auto-generated method stub
			this.name=resultSet.getString(1);
			this.id=resultSet.getInt(2);
		}

		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeInt(this.id);
			Text.writeString(out, this.name);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			this.id=in.readInt();
			this.name=Text.readString(in);
		}
		
		
	}
	
}


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇03-HDFS操作 下一篇数据导入终章:如何将HBase的数据..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目