设为首页 加入收藏

TOP

Flume整合mysql碰到的自定义source表名问题
2019-04-23 14:08:32 】 浏览:77
Tags:Flume 整合 mysql 碰到 定义 source 问题
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_35347459/article/details/74898061
之前项目中碰到了一些关于flume采集的问题,把一些解决方法介绍一下,用于针对不同需求的采集需求。我所碰到的问题是flume采集mysql中数据的时候,表名不唯一的问题,我们常用flume-sql-source.jar的时候,在配置文件里面会加上 a3.sources.src-1.table=表名 ,这个表名不能更改,比如说每天采集的表名不是一样的,那每天都需要改这里的配置表名,而常用的%y%m%d这种配置是适配不出来每天的日期,因为这个配置会在flume-sql-source中按字符串读进去,并不能适配像shell脚本的匹配日期那样,碰到这种问题,无非是从flume-sql-source入手,更改它读取table字段的方式,制作一个满足特定需求的依赖即可,不用自己再写一个自定义source那么麻烦。

(1)首先查看一下flume-sql-source.jar中的内容,找到读取配置文件并赋值的类的位置,这里我找到位置在org.keedio.flume.source.SQLSourceHelper类中构造方法里面

public SQLSourceHelper(Context context, String sourceName) {
        this.statusFilePath = context.getString("status.file.path", "/var/lib/flume");
        this.statusFileName = context.getString("status.file.name");
        this.connectionURL = context.getString("connection.url");
        this.table=context.getString("table");  //------这个位置table的表名
        this.columnsToSelect = context.getString("columns.to.select", "*");
        this.runQueryDelay = context.getInteger("run.query.delay", Integer.valueOf(10000)).intValue();
        this.user = context.getString("user");
        this.password = context.getString("password");
        this.directory = new File(this.statusFilePath);
        this.customQuery = context.getString("custom.query");
        this.batchSize = context.getInteger("batch.size", Integer.valueOf(100)).intValue();
        this.maxRows = context.getInteger("max.rows", Integer.valueOf(10000)).intValue();
        this.hibernateDialect = context.getString("hibernate.dialect");
        this.hibernateDriver = context.getString("hibernate.connection.driver_class");
        this.sourceName = sourceName;
        this.statusColumn = context.getString("status.column");
        this.startFrom = context.getString("start.from");
        this.checkMandatoryProperties();
        if(!this.isStatusDirectoryCreated()) {
            this.createDirectory();
        }

        this.file = new File(this.statusFilePath + "/" + this.statusFileName);
        this.currentIndex = this.getStatusFileIndex(context.getInteger(this.startFrom, Integer.valueOf(0)).intValue());
        this.query = this.buildQuery();
    }

(2)如果需要改写jar包的话,还要反编译软件太麻烦,这里直接新建一个project,注意:新建的包名要与org.keedio.flume.source一致,然后将SQLSourceHelper类中的内容拷贝到新建的class中,注意class名也要一致,然后在pom里面配置一些依赖,没有错误之后,注释掉this.table=context.getString(“table”);然后加上一些更改:比如说我的需求的表名始终是n_加上当日日期,那只需要将this.table=context.getString(“table”)更改为如下:

SimpleDateFormat format = new SimpleDateFormat("yyMMdd");
String time = format.format(date);
this.table = "n_" + time;

然后没有报错之后,在进行编译,把编译好的SQLSourceHelper.class替换原来jar包中的SQLSourceHelper.class,保存上传到flume的lib目录,执行
bin/flume-ng agent -n a4 -c conf/ -f conf/a4.conf -Dflume.root.logger=INFO,console
之后就成功了,连接自己本地的mysql新建一个表名为n_170709做个测试,发现
这里写图片描述
测试成功,在hdfs上flume目录下就可看到采集的内容:
这里写图片描述
这里由于是在我自己的集群上,速度比较慢,我设置了每隔60s形成一个文件,所以采集出来是多个文件的格式。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop生态圈(八):Flume 下一篇flume 之监视日志数据抽出到hdfs..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目