设为首页 加入收藏

TOP

Java实现HDFS文本解析写入到Hbase中
2019-05-02 01:48:22 】 浏览:73
Tags:Java 实现 HDFS 文本 解析 写入 Hbase
版权声明:学习交流为主,未经博主同意禁止转载,禁止用于商用。 https://blog.csdn.net/u012965373/article/details/79267947

首先先在Hbase 中建表,参考我上一篇Java操作Hbase 的博客。

接着代码:

package com.xxx.report.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xxx.report.config.Constants;
import com.xxx.report.util.HbaseUtilQA;
import com.xxx.report.util.MD5Util;
import com.xxx.report.util.TimeUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @author yangxin-ryanx
 */
public class LockAppLog2Hbase implements Serializable{
    private static final Logger LOG = LoggerFactory.getLogger(LockAppLog2Hbase.class);
    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
    private static final String CF_NAME = "cf";
    private static final String tableName = "rd:app_log";

    public void run(String master, String startTime, String endTime) {
        long start = System.currentTimeMillis();
        LOG.info("Start run the log parser...");
        startTime = startTime.replace("-", "");
        endTime = endTime.replace("-","");
        SparkConf conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_YangXin).setMaster(master);
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        List<String> list = Lists.newArrayList();
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.YEAR, Integer.valueOf(startTime.substring(0,4)));
        calendar.set(Calendar.MONTH, Integer.valueOf(startTime.substring(4,6)) - 1);
        calendar.set(Calendar.DATE, Integer.valueOf(startTime.substring(6,8)));
        String date = startTime;
        while (!date.equals(endTime)){
            list.add(date);
            calendar.add(Calendar.DATE, 1);
            date = simpleDateFormat.format(calendar.getTime());
        }
        list.add(endTime);


        for (String day : list){
            StringBuffer path = new StringBuffer();
            path.append(Constants.PREFIX_PATH_YangXin).append(day).append("/*/*");

            JavaRDD<String> rdd = javaSparkContext.textFile(path.toString());

            if (rdd.isEmpty()){
                continue;
            }
            rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void call(Iterator<String> logs) throws Exception {
                    while (logs.hasNext()) {

                        String log = logs.next();
                        try {
                            String snValue = log.split(" ")[10];
                            if (snValue.equals("-")){
                                continue;
                            }
                            handleLog(log);
                        } catch (Exception e) {
                            e.printStackTrace();
                            LOG.error(e.getMessage());
                        }
                    }
                }
            });
            HbaseUtilQA.flush(tableName);
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时:"+(end-start)/1000+"秒");
    }


    private void handleLog(String line){
        MD5Util md5Util = new MD5Util();
        String snValue = line.split(" ")[10];
        String timePart1 = line.split(" ")[0];
        String timePart2 = line.split(" ")[1];
        String time = timePart1 + " " + timePart2;
        String rowKey = md5Util.md5Function(snValue, timePart1);
        Long timeStamp = TimeUtil.getTimeStamp(time);
        Map<String, String> lockAppMap = Maps.newHashMap();
        String key = md5Util.md5Key(line) + "_" + timePart2.replace(".", "").replace(":","");
        lockAppMap.put(key, line);
        HbaseUtilQA.addRecords(tableName, rowKey, timeStamp, CF_NAME.getBytes(), lockAppMap);
    }

    public static void main(String[] args){
        LockAppLog2Hbase lockAppLog2Hbase = new LockAppLog2Hbase();
        String master = args[0];
        String startTime = args[1];
        String endTime = args[2];
        lockAppLog2Hbase.run(master, startTime, endTime);
    }
}


思路先拼接hdfs路径,然后Spark加载进来,然后分区读取,处理,一条一条写入。



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop集群部署,Hbase创建表错误.. 下一篇hbase总结(二)-hbase安装

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目