设为首页 加入收藏

TOP

SparkSQL+Hbase+HDFS实现SQL完全封装(一)
2019-02-09 01:48:32 】 浏览:5
Tags:SparkSQL Hbase HDFS 实现 SQL 完全 封装

1.描述:

通过读取SQL脚本文件内的SQL脚本,实现在大数据平台中的业务处理,对于没有JAVA或是语言基础的同学来说,通过封装完全可以只写SQL就能实现业务的处理。

2.平台环境

Spark:spark-2.2.1-bin-hadoop2.7

3.具体思路:

通过读取HDFS上的SQL脚本文件[可以直接放到Linux上面],解析SQL脚本获取SparkSQL需要的原表、目标表、目标表的字段名以及查询SQL,通过SparkSession.sql方法执行查询SQL返回结果保存到目标表。

4.代码实现

4.1读取SQL脚本文件

通过jsqlparser读取在Maven中加入

<!--SQL解析包-->
        <dependency>
            <groupId>com.github.jsqlparser</groupId>
            <artifactId>jsqlparser</artifactId>
            <version>1.1</version>
        </dependency>

SQL样例如下:

INSERT INTO org_table(
           id
          ,ygbh 
          ,ygzh  
          ,zwxm         
          ,zsxm       
          ,bmmc 
          ,xjsj 
)
SELECT DISTINCT 
      a.ygbh                       AS id
     ,a.ygbh                       AS ygbh /**员工编号**/
     ,a.ygzh                       AS ygzh  /**账户**/
     ,a.zwxm                       AS zwxm         /**中文姓名**/
     ,concat(a.ygbh,'-',a.zsxm)    AS zsxm       /**真实姓名**/
     ,c.bmmc                       AS bmmc    /**部门名称**/ 
     ,current_timestamp()          AS xjsj    /**新建时间**/ 
FROM 
          yuan_gong_biao a 
INNER JOIN  
          (SELECT 
                  b.bmid AS bmid 
                 ,b.bmmc         AS bmmc 
              FROM 
                 bu_men_biao b 
           UNION ALL
           SELECT 
                  b.bmid AS bmid 
                 ,b.bmmc         AS bmmc 
              FROM 
                 bu_men_biao b 
           )  c
      ON  a.bmid = c.bmid  
WHERE
      a.ygbh like'81%'

读取SQL脚本文件方法

 /**
     * 输入文件路径[HDFS]读取文件内容
     * @param filePath
     * @return
     */
    private static List<String> getFileva lue(String filePath){
        List<String> fileva lueList = new ArrayList<>();
        Configuration conf = new Configuration();
        BufferedReader confBuff = null;
        try {
            //读取HDFS上的文件系统
            FileSystem fs = FileSystem.get(conf);
            // 调取任务的配置信息
            Path confPath = new Path(filePath);
            //设置流读入和写入
            InputStream confIn = fs.open(confPath);
            //使用缓冲流,进行按行读取的功能
            confBuff = new BufferedReader(new InputStreamReader(confIn));
            String confStr=null;
            String keyOneStr= null;
            String keyTwoStr=null;
            while((confStr=confBuff.readLine())!=null){
                //截取注释
                if(confStr.trim().length()!=0) {
                    if(confStr.trim().length()>=3){
                        keyOneStr = confStr.trim().substring(0, 3);
                        keyTwoStr = confStr.trim().substring(0, 2);
                        if (!keyOneStr.equals("/**") || !keyTwoStr.equals("--")) {
                            //本行无注释
                            if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") == -1) {
                                fileva lueList.add(confStr);
                            }
                            //本行以/**开头后面的注释包含--
                            if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") < confStr.indexOf("--"))) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("/**"))+" ");
                            }
                            //本行以--开头后面的注释包含/**
                            if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") > confStr.indexOf("--"))) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("--"))+" ");
                            }
                            //本行以/**注释开头
                            if (confStr.indexOf("/**") > -1 && confStr.indexOf("--") == -1) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("/**"))+" ");
                            }
                            //本行以--注释开头
                            if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") > -1) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("--"))+" ");
                            }
                        }
                    }else{
                        fileva lueList.add(confStr+" ");
                    }
                }
            }
            confBuff.close();
            confIn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return fileva lueList;

    }

解析SQL脚本文件方法:

 /**
     * 通过输入的SQL语句获取插入表名与插入的列
     * @param sqlStr
     * @return
     * @throws JSQLParserException
     */
    public static Map getSQLPares(String sqlStr) throws JSQLParserException {
        //解析SQL后把要的内容存储到MAP里
        Map sqlPareMap = new HashMap();
        //生成对象
        CCJSqlParserManager pm = new CCJSqlParserManager();
        //返回一个InsertStatement对象
        System.out.println("sqlStr ================ " + sqlStr);
        Insert insertStatement = (Insert) pm.parse(new StringReader(sqlStr));
        //返回要插入的目标表表名
        String insertTableName=insertStatement.getTable().getName();
        //放入MAP里
        sqlPareMap.put("tgtTableName",insertTableName);
        //通过目标表名得到字段名
        List<String> tgtTableColumnList = HBaseUtils.getTableColumn(insertTableName);
        //如果目标表为空字段名直接从SQL语句里取得
        if(tgtTableColumnList.size()==0||tgtTableColumnList==null){
            tgtTableColumnList = getColumnName(insertStatement);
        }
        //把返回的列名LIST放入MAP里
        sqlPareMap.put("tgtTableColumn", tgtTableColumnList);
        //把insert语句后面跟着的SELECT语句放到MAP里
        sqlPareMap.put("SQL",insertStatement.getSelect().toString());
        //返回一个查询对象
        Select selectStatement = (Select) pm.parse(new StringReader(insertStatement.getSelect().toString()));
        TablesNamesFinder tablesNamesFinder = new TablesNamesFinder();
        //获取查询对象中源表的表名LIST
        List<String> tableNameList = tablesNamesFinder.getTableList(selectStatement);
        //放入到MAP里
        sqlPareMap.put("srcTableName",tableNameList);
        return sqlPareMap;
    }

实现调用:

public static void main(String[] args){
        //配置文件名
        Map confInfoMap = SparkSQLUtils.getSQLConfig("deal","org");
        //固定写法得到源表的表列表
        List<String> tableNameList =(ArrayList<String>)confInfoMap.get("srcTableName");
        for(String srcTableName:tableNameList){
            System.out.println("=src==============="+srcTableName);
        }
        //固定写法得到业务逻辑处理的SQL脚本
        String selSQL = (String)confInfoMap.get("SQL");
        System.out.println("=sql==============="+selSQL);
        //固定写法得到业务逻辑处理的目标表名
        String tgtTable = (String)confInfoMap.get("tgtTableName");
        System.out.println("=tgt==============="+tgtTable);
        //固定写法得到目标表的列字段名
        List<String> columnList = (List<String>)confInfoMap.get("tgtTableColumn");

        //清空数据
//        HBaseUtils.truncateData(tgtTable);
        SparkSession sparkSession = SparkSQLUtils.getSparkSQL(tableNameList,columnList);
        Dataset<Row> nameDf=sparkSession.sql(selSQL);
//        List<Row> recordList=nameDf.collectAsList();
//        List<Put> putList = new ArrayList<>();
//        for(Row record:recordList){
//            Put recordPut = new Put(Bytes.toBytes(record.getAs("id").toString()));
//            for(String columnName:columnList){
//                recordPut.addColumn(Bytes.toBytes("data"), Bytes.toBytes(columnName), Bytes.toBytes(record.getAs(columnName).toString()));
//            }
//            putList.add(recordPut);
//        }

//        HBaseUtils.insertData(tgtTable,putList);
        nameDf.show();
    }


SparkSQL封装类:

import net.sf.jsqlparser.JSQLParserException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class SparkSQLUtils {

    public static SparkSession getSparkSQL(List<String> tableNameList,List<String> columnList){
        //新建SparkSession
        SparkSession sparkSQL= getSparkSession();
        //new一个JavaSparkContext 对象
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSQL.sparkContext());
        //获取HBASE连接
        Configuration hbaseConfig = SparkHBaseUtils.getConfiguration();

        //循环遍历把要查询的表视图化
        for(String tableNameStr:tableNameList){
            hbaseConfig.set(TableInputFormat.INPUT_TABLE, tableNameStr);
            JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = javaSparkContext.newAPIHadoopRDD(hbaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

            //列RDD
            JavaRDD<List<String>> recordColumnRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>, List<String>>() {
                public List<String>  call(Tuple2<ImmutableBytesWritable, Result> tuple) {
                    List<String> recordColumnList = new ArrayList();
                    Result result = tuple._2;
                    Cell[] cells = result.rawCells();
                    for (Cell cell : cells) {
                        recordColumnList.add(new String(CellUtil.cloneQualifier(cell)));
                    }
                    return recordColumnList;
                }
            });

            //数据RDD
            JavaRDD<Row> recordValuesRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>, Row>() {
                public Row  call(Tuple2<ImmutableBytesWritable, Result> tuple) {
                    List<String> recordList = new ArrayList();
                    Result result = tuple._2;
                    Cell[] cells = result.rawCells();
                    for (Cell cell : cells) {
                        recordList.add(new String(CellUtil.cloneva lue(cell)));
                    }
                    return (Row) RowFactory.create(recordList.toArray());
                }
            });

            //设置字段
            List<StructField> structFields=new ArrayList<StructField>();
//            if(columnList!=null){
//                for(String columnStr:columnList){
//                    structFields.add(DataTypes.createStructField(columnStr, DataTypes.StringType, true));
//                }
//            }else{
                List<String> fieldsList=recordColumnRDD.first();
                for(String columnStr:fieldsList){
                    structFields.add(DataTypes.createStructField(columnStr, DataTypes.StringType, true));
                }
//            }

            //新建列schema
            StructType schema=DataTypes.createStructType(structFields);
            Dataset employeesDataset= sparkSQL.createDataFrame(recordValuesRDD,schema);
            employeesDataset.printSchema();
            //spark表视图
            employeesDataset.createOrReplaceTempView(tableNameStr);
        }
        return sparkSQL;
    }

    public static Map getSQLConfig(String path,String fileName){
        Map tableConfigMap=null;
        List<String> tableList = new ArrayList<String>();
       //SQL文件路径
        String sqlPath=null;
        if(path==null){
            sqlPath="/user/app/hbase_table/sysconfig/sql/"+fileName+".sql";
        }else{
            sqlPath="/user/app/hbase_table/sysconfig/sql/"+path+"/"+fileName+".sql";
        }
        List<String> fileva lueList = getFileva lue(sqlPath);
        String sqlValueStr="";
        for(String lineStr:fileva lueList){
                sqlValueStr=sqlValueStr+lineStr;
        }
        try {
            tableConfigMap = SQLAnalysisUtils.getSQLPares(sqlValueStr);
//            tableConfigMap.put("SQL",sqlValueStr);
        } catch (JSQLParserException e) {
            e.printStackTrace();
        }
        return tableConfigMap;
    }

    /**
     * 输入文件路径[HDFS]读取文件内容
     * @param filePath
     * @return
     */
    private static List<String> getFileva lue(String filePath){
        List<String> fileva lueList = new ArrayList<>();
        Configuration conf = new Configuration();
        BufferedReader confBuff = null;
        try {
            //读取HDFS上的文件系统
            FileSystem fs = FileSystem.get(conf);
            // 调取任务的配置信息
            Path confPath = new Path(filePath);
            //设置流读入和写入
            InputStream confIn = fs.open(confPath);
            //使用缓冲流,进行按行读取的功能
            confBuff = new BufferedReader(new InputStreamReader(confIn));
            String confStr=null;
            String keyOneStr= null;
            String keyTwoStr=null;
            while((confStr=confBuff.readLine())!=null){
                //截取注释
                if(confStr.trim().length()!=0) {
                    if(confStr.trim().length()>=3){
                        keyOneStr = confStr.trim().substring(0, 3);
                        keyTwoStr = confStr.trim().substring(0, 2);
                        if (!keyOneStr.equals("/**") || !keyTwoStr.equals("--")) {
                            //本行无注释
                            if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") == -1) {
                                fileva lueList.add(confStr);
                            }
                            //本行以/**开头后面的注释包含--
                            if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") < confStr.indexOf("--"))) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("/**"))+" ");
                            }
                            //本行以--开头后面的注释包含/**
                            if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") > confStr.indexOf("--"))) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("--"))+" ");
                            }
                            //本行以/**注释开头
                            if (confStr.indexOf("/**") > -1 && confStr.indexOf("--") == -1) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("/**"))+" ");
                            }
                            //本行以--注释开头
                            if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") > -1) {
                                fileva lueList.add(confStr.substring(0, confStr.indexOf("--"))+" ");
                            }
                        }
                    }else{
                        fileva lueList.add(confStr+" ");
                    }
                }
            }
            confBuff.close();
            confIn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return fileva lueList;

    }


    private static SparkSession getSparkSession(){
        SparkSession spark= SparkSession.builder()
                .appName("SparkApp")
                .master("local[2]")
                .getOrCreate();
        return spark;
    }
}

SQL解析类:

import hbase.comm.HBaseUtils;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.util.TablesNamesFinder;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class SQLAnalysisUtils {

    /**
     * 通过输入的SQL语句获取插入表名与插入的列
     * @param sqlStr
     * @return
     * @throws JSQLParserException
     */
    public static Map getSQLPares(String sqlStr) throws JSQLParserException {
        //解析SQL后把要的内容存储到MAP里
        Map sqlPareMap = new HashMap();
        //生成对象
        CCJSqlParserManager pm = new CCJSqlParserManager();
        //返回一个InsertStatement对象
        System.out.println("sqlStr ================ " + sqlStr);
        Insert insertStatement = (Insert) pm.parse(new StringReader(sqlStr));
        //返回要插入的目标表表名
        String insertTableName=insertStatement.getTable().getName();
        //放入MAP里
        sqlPareMap.put("tgtTableName",insertTableName);
        //通过目标表名得到字段名
        List<String> tgtTableColumnList = HBaseUtils.getTableColumn(insertTableName);
        //如果目标表为空字段名直接从SQL语句里取得
        if(tgtTableColumnList.size()==0||tgtTableColumnList==null){
            tgtTableColumnList = getColumnName(insertStatement);
        }
        //把返回的列名LIST放入MAP里
        sqlPareMap.put("tgtTableColumn", tgtTableColumnList);
        //把insert语句后面跟着的SELECT语句放到MAP里
        sqlPareMap.put("SQL",insertStatement.getSelect().toString());
        //返回一个查询对象
        Select selectStatement = (Select) pm.parse(new StringReader(insertStatement.getSelect().toString()));
        TablesNamesFinder tablesNamesFinder = new TablesNamesFinder();
        //获取查询对象中源表的表名LIST
        List<String> tableNameList = tablesNamesFinder.getTableList(selectStatement);
        //放入到MAP里
        sqlPareMap.put("srcTableName",tableNameList);
        return sqlPareMap;
    }

    /**
     * 返回SQL语句中INSERT后面的字段名
     * @param insertStatement
     * @return
     * @throws JSQLParserException
     */
    public static List<String> getColumnName(Insert insertStatement) throws JSQLParserException {
        List<String> columnNameList = new ArrayList<String>();
        List<Column> columns=insertStatement.getColumns();
        for(Column column:columns){
            System.out.println("tableColumn=============="+column.getColumnName());
            columnNameList.add(column.getColumnName());
        }
        return columnNameList;
    }

}

HBASE操作类:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;

public class SparkHBaseUtils {

    private static final String QUORUM = "192.168.0.100";
    private static final String CLIENTPORT = "2181";
    private static Configuration conf = null;
    private static Connection conn = null;
    private static JavaSparkContext context=null;

    /**
     * 返回一个JavaSparkContext
     * @return
     */
    public static synchronized JavaSparkContext getJavaSparkContext(){
        if(context==null){
            SparkConf sparkConf=new SparkConf();
            sparkConf.setAppName("SparkApp");
            sparkConf.setMaster("local[5]");
            JavaSparkContext context = new JavaSparkContext(sparkConf);
        }
        return context;
    }


    /**
     * 获取全局唯一的Configuration实例
     * @return
     */
    public static synchronized Configuration getConfiguration()
    {
        if(conf == null)
        {
            conf =  HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", QUORUM);
            conf.set("hbase.zookeeper.property.clientPort", CLIENTPORT);
            conf.addResource("hbase-site.xml");
            conf.addResource("core-site.xml");
        }
        return conf;
    }

    /**
     * 获取全局唯一的HConnection实例
     * @return
     *
     */
    public static synchronized Connection getHConnection() {

        if (conf == null){
            getConfiguration();
        }

        if(conn == null)
        {
            /*
             * * 创建一个HConnection
             * HConnection connection = HConnectionManager.createConnection(conf);
             * HTableInterface table = connection.getTable("mytable");
             * table.get(...); ...
             * table.close();
             * connection.close();
             * */
            try {
                conn  = ConnectionFactory.createConnection(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return conn;
    }





    // 关闭连接
    public static void connClose() {
        try {
            if (null != conn)
                conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase如何存取多个版本的值 下一篇HBase性能调优(1.2官方文档)

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }