设为首页 加入收藏

TOP

借鉴sqoop实现hdfs文件内容导入mysql
2019-04-22 00:21:05 】 浏览:50
Tags:借鉴 sqoop 实现 hdfs 文件 内容 导入 mysql

这次需要将hadoop mr的计算结果导入到mysql中,虽然是mr的结果导入db中,为了保险起见,还是存在hdfs上,之后读取hdfs上的结果导入db中,读取失败可重新执行单个读取导入过程。


一般先动手前,有个思路,再百度看看是否有更好的实现,大略搜了一下,发现sqoop貌似实现了hdfs和各种dc之间的读取写入。这里,因为业务简单,都是insert语句不涉及事务,只是连接一个db,不涉及mr等操作,so我只是借鉴sqoop的思想,没有使用sqoop。


一般实现的思路就是,读取hdfs文件,生成对应的insert语句,导入mysql就好了。

其中需要详细考虑的几个问题如下:

1、批量导入insert,一般的数据量设置多大好些?

2、执行一般失败后重新导入数据,对于已经导入的数据如何处理?



这里的话,当然批量导入会好些,但是也要考虑hdfs的reader是一个个读取数据,如果批量导入的size太大,需要存储数据的变量占用的内存大,会导致oom。

一般批量insert的量不超过1000条就好了,我这边的话,每天的量也就2000条,so我设置成500条了。


执行一般失败后重新导入数据,感觉要先删除已有导入的数据,再次导入。因为我这边每天导入一次,根据日期做delete就行了。对于不同场景,需要自己操作了。

如果是mysql数据库的话,不用自己删除了。

因为有个语句 INSERT INTO ..ON DUPLICATE KEY UPDATE ,如果执行时不存在重复记录,则执行新增操作,否则执行更新操作。详细的内容可以读http://www.jb51.net/article/39255.htm

如果db是mysql的话,就不用自己先删除数据了。



INSERT INTO TABLE (a,c) VALUES (1,3),(1,7) ON DUPLICATE KEY UPDATE c=VALUES(c);

上语句比单独执行2条语句快。



本人才疏学浅,有错误之处请不吝指教。

关于insert的数量,很有必要深入研究下。





至此说完。。。



------------------------------

贴些代码(部分是sqoop中的源码改编)



    /**
     * 如果记录重复,则执行更新操作,否则执行新增数据操作
     * @param numRows
     * @return
     */
    protected String getUpdateStatement(int numRows) {
        boolean first;
        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO ");
        sb.append(tableName);
        sb.append("(");
        first = true;
        for (String column : columnNames) {
            if (first) {
                first = false;
            } else {
                sb.append(", ");
            }
            sb.append(column);
        }

        sb.append(") VALUES(");
        for (int i = 0; i < numRows; i++) {
            if (i > 0) {
                sb.append("),(");
            }
            for (int j = 0; j < columnNames.length; j++) {
                if (j > 0) {
                    sb.append(", ");
                }
                sb.append("");
            }
        }

        sb.append(") ON DUPLICATE KEY UPDATE ");

        first = true;
        for (String column : columnNames) {
            if (first) {
                first = false;
            } else {
                sb.append(", ");
            }

            sb.append(column).append("=VALUES(").append(column).append(")");
        }

        String query = sb.toString();
        LOG.debug("Using upsert query: " + query);
        return query;
    }


    protected PreparedStatement getPreparedStatement(List<Writable> userRecords) throws SQLException {

        PreparedStatement stmt = null;

        // Synchronize on connection to ensure this does not conflict
        // with the operations in the update thread.
        Connection conn = ConnectionRelate.getConn();
        stmt = conn.prepareStatement(getUpdateStatement(userRecords.size()));
        // Inject the record parameters into the UPDATE and WHERE clauses. This
        // assumes that the update key column is the last column serialized in
        // by the underlying record. Our code auto-gen process for exports was
        // responsible for taking care of this constraint.
        int i = 0;
        for (Writable record : userRecords) {
           setParam(stmt, i, record);
           i += columnNames.length;
        }
        stmt.addBatch();

        return stmt;
    }



 try {
            reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, new Configuration());
            StringTriWritable key = new StringTriWritable();
            Writable value = (MediaScoreVector) reader.getValueClass().newInstance();
            while (reader.next(key, value)) {
               list.add(value);
               index++;
               if(index == batchSize){
                   index = 0;
                   //exp the data to the mysql
                   stmt = mediaSQL.getPreparedStatement(list);
                   stmt.executeBatch();

                   list.clear();
               }
            }

            //deal with size less batchsize
            if(list.size()>0) {
                stmt = mediaSQL.getPreparedStatement(list);
                stmt.executeBatch();

                list.clear();
                list = null;
            }


        } catch (Exception e) {
            。。。。
        } finally {
            if(stmt!=null){
                
               。。。。
            }
            。。。。
        }



至此说完。。。


本人才疏学浅,有错误之处请不吝指教。

关于insert的数量,很有必要深入研究下。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flink HDFS Sink 如何保证 exactl.. 下一篇Hadoop基础教程-第4章 HDFS的Java..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目