设为首页 加入收藏

TOP

mapreduce输出数据存入HBase中
2019-01-06 01:49:27 】 浏览:41
Tags:mapreduce 输出 数据 存入 HBase
版权声明:分享的快乐。。 https://blog.csdn.net/baolibin528/article/details/46052111


数据格式:

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200


事先在HBase里把表创建好:

create 'mr_hbases','cf'


代码如下:

package mrhbase;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;


public class M {
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, LongWritable, Text>.Context context)
				throws IOException, InterruptedExce
		    

ption { String line=value.toString(); String[] splited=line.split("\t"); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); String format = simpleDateFormat.format(new Date(Long.parseLong(splited[0].trim()))); String rowKey=splited[1]+"_"+format; Text v2s = new Text(); v2s.set(rowKey+"\t"+line); context.write(key, v2s); } } public static class MyReduce extends TableReducer<LongWritable,Text,NullWritable>{ private String family="cf"; @Override protected void reduce(LongWritable arg0, Iterable<Text> v2s, Reducer<LongWritable, Text, NullWritable, Mutation>.Context context) throws IOException, InterruptedException { for (Text v2 : v2s) { String[] splited=v2.toString().split("\t"); String rowkey=splited[0]; Put put = new Put(rowkey.getBytes()); put.add(family.getBytes(), "raw".getBytes(),v2.toString().getBytes()); put.add(family.getBytes(), "rePortTime".getBytes(),splited[1].getBytes()); put.add(family.getBytes(), "msisdn".getBytes(),splited[2].getBytes()); put.add(family.getBytes(), "apmac".getBytes(),splited[3].getBytes()); put.add(family.getBytes(), "acmac".getBytes(),splited[4].getBytes()); put.add(family.getBytes(), "host".getBytes(),splited[5].getBytes()); put.add(family.getBytes(), "siteType".getBytes(),splited[6].getBytes()); put.add(family.getBytes(), "upPackNum".getBytes(),splited[7].getBytes()); put.add(family.getBytes(), "downPackNum".getBytes(),splited[8].getBytes()); put.add(family.getBytes(), "upPayLoad".getBytes(),splited[9].getBytes()); put.add(family.getBytes(), "downPayLoad".getBytes(),splited[10].getBytes()); put.add(family.getBytes(), "httpStatus".getBytes(),splited[11].getBytes()); context.write(NullWritable.get(), put); } } } private static final String TableName="mr_hbases"; @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.1.10:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.1.10:2181"); conf.set(TableOutputFormat.OUTPUT_TABLE, TableName); Job job = new Job(conf, M.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(M.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://192.168.1.10:9000/wlan"); job.waitForCompletion(true); } }




编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka+sparkstreaming+hbase 下一篇Hbase和Hive以及传统数据库的区别

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }