设为首页 加入收藏

TOP

mapreduce输出数据存入HBase中
2019-01-06 01:49:27 】 浏览:28
Tags:mapreduce 输出 数据 存入 HBase
版权声明:分享的快乐。。 https://blog.csdn.net/baolibin528/article/details/46052111


数据格式:

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200


事先在HBase里把表创建好:

create 'mr_hbases','cf'


代码如下:

package mrhbase;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;


public class M {
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, LongWritable, Text>.Context context)
				throws IOException, InterruptedException {
			String line=value.toString();
			String[] splited=line.split("\t");
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
			String format = simpleDateFormat.format(new Date(Long.parseLong(splited[0].trim())));
			String rowKey=splited[1]+"_"+format;
			Text v2s = new Text();
			v2s.set(rowKey+"\t"+line);
			context.write(key, v2s);
		}
	}
	
	public static class MyReduce extends TableReducer<LongWritable,Text,NullWritable>{
		private String family="cf";
		@Override
		protected void reduce(LongWritable arg0, Iterable<Text> v2s,
				Reducer<LongWritable, Text, NullWritable, Mutation>.Context context)
				throws IOException, InterruptedException {
			for (Text v2 : v2s) {
				String[] splited=v2.toString().split("\t");
				String rowkey=splited[0];
				Put put = new Put(rowkey.getBytes());
				put.add(family.getBytes(), "raw".getBytes(),v2.toString().getBytes());
				put.add(family.getBytes(), "rePortTime".getBytes(),splited[1].getBytes());
				put.add(family.getBytes(), "msisdn".getBytes(),splited[2].getBytes());
				put.add(family.getBytes(), "apmac".getBytes(),splited[3].getBytes());
				put.add(family.getBytes(), "acmac".getBytes(),splited[4].getBytes());
				put.add(family.getBytes(), "host".getBytes(),splited[5].getBytes());
				put.add(family.getBytes(), "siteType".getBytes(),splited[6].getBytes());
				put.add(family.getBytes(), "upPackNum".getBytes(),splited[7].getBytes());
				put.add(family.getBytes(), "downPackNum".getBytes(),splited[8].getBytes());
				put.add(family.getBytes(), "upPayLoad".getBytes(),splited[9].getBytes());
				put.add(family.getBytes(), "downPayLoad".getBytes(),splited[10].getBytes());
				put.add(family.getBytes(), "httpStatus".getBytes(),splited[11].getBytes());
				context.write(NullWritable.get(), put);
			}
		}
	}
	
	private static final String TableName="mr_hbases";
	@SuppressWarnings("deprecation")
	public static void main(String[] args) throws Exception {
		
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.rootdir", "hdfs://192.168.1.10:9000/hbase");
		conf.set("hbase.zookeeper.quorum", "192.168.1.10:2181");
		conf.set(TableOutputFormat.OUTPUT_TABLE, TableName);
		
	
		Job job = new Job(conf, M.class.getSimpleName());
		TableMapReduceUtil.addDependencyJars(job);
		job.setJarByClass(M.class);
		
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		
		FileInputFormat.setInputPaths(job, "hdfs://192.168.1.10:9000/wlan");
		
		job.waitForCompletion(true);
	}
	
}




编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka+sparkstreaming+hbase 下一篇Hbase和Hive以及传统数据库的区别

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }