设为首页 加入收藏

TOP

用户行为日志导入到HBase表
2018-12-07 01:57:40 】 浏览:24
Tags:用户 行为 日志 导入 HBase
userlog.txt
1524723368732	15833995577	00-50-56-C0-00-08	192.168.106.1	视频网站	100	200
1524723368732	15833995577	00-50-56-C0-00-08	192.168.106.1	视频网站	100	200
1524723368732	15833995577	00-50-56-C0-00-08	192.168.106.1	视频网站	100	200
1524723368732	15833995577	00-50-56-C0-00-08	192.168.106.1	视频网站	100	200
1524723368732	15833995577	00-50-56-C0-00-08	192.168.106.1	视频网站	100	200
1524723368720	15833995588	00-50-56-C0-00-09	192.168.106.2	视频网站	200	300
1524723368720	15833995588	00-50-56-C0-00-09	192.168.106.2	视频网站	200	300
1524723368720	15833995588	00-50-56-C0-00-09	192.168.106.2	视频网站	200	300
1524723368720	15833995588	00-50-56-C0-00-09	192.168.106.2	视频网站	200	300
1524723368720	15833995588	00-50-56-C0-00-09	192.168.106.2	视频网站	200	300
1524723368750	15533995566	00-50-56-C0-00-10	192.168.106.3	图片网站	500	400
1524723368750	15533995566	00-50-56-C0-00-10	192.168.106.3	图片网站	500	400
1524723368750	15533995566	00-50-56-C0-00-10	192.168.106.3	图片网站	500	400
1524723368750	15533995566	00-50-56-C0-00-10	192.168.106.3	图片网站	500	400
1524723368750	15533995566	00-50-56-C0-00-10	192.168.106.3	图片网站	500	400
package com.lhjava.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org
		    

.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * 用户行为日志导入到HBase表 */ public class HBaseImportApp { public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); String time = sdf.format(new Date(Long.parseLong(splits[0]))); String rowkey = splits[1] + "_" + time; Text outputValue = new Text(); outputValue.set(rowkey + "\t" + value.toString()); context.write(key, outputValue); } } public static class MyReducer extends TableReducer<LongWritable, Text, NullWritable>{ String cf = "info"; @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { String[] splits = value.toString().split("\t"); Put put = new Put(Bytes.toBytes(splits[0])); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("time"), Bytes.toBytes(splits[1])); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tel"), Bytes.toBytes(splits[2])); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("mac"), Bytes.toBytes(splits[3])); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("ip"), Bytes.toBytes(splits[4])); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("type"), Bytes.toBytes(splits[5])); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("up"), Bytes.toBytes(splits[6])); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("down"), Bytes.toBytes(splits[7])); context.write(NullWritable.get(), put); } } } public static void main(String[] args) throws Exception{ Configuration configuration = new Configuration(); configuration.set("hbase.rootdir", "hdfs://luheng:8082/hbase"); configuration.set("hbase.zookeeper.quorum", "luheng:2181"); configuration.set(TableOutputFormat.OUTPUT_TABLE, args[0]); Job job = new Job(configuration, "HBaseImportApp"); job.setJarByClass(HBaseImportApp.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPaths(job, args[1]); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); } }
hadoop jar aa-1.0-SNAPSHOT.jar com.lhjava.hbase.HBaseImportApp user_log /data/userlog.txt




编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇大数据的存储—HBase 下一篇Hbase常用操作(增删改查)

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }