设为首页 加入收藏

TOP

flume 1.7 TailDir source重复获取数据集不释放资源解决办法
2019-05-03 14:10:09 】 浏览:81
Tags:flume 1.7 TailDir source 重复 获取 数据 释放 资源 解决 办法

背景:银行日志生产方式一般有两种

1)按大小切分:xxx.logxxx.log1 xxx.log2,及最新日志写入.log,原来的.log mv.log1.log1 mv.log2,依次类推,每个日志固定大小(10M50M之类)。

2)按天切分:xxx.log xxx.log-20171224(xxx.log-日期),最新日志写入.log,后面的按照日期备份,基本为每天一个日志。

目前缺陷:flume1.7增加了TAILDIR source,可以用于tail日志一个目录下的多个文件。但是 1)、出现数据重复(应用场景是用flume抓取应用日志,根据应用日志实时算交易量,所以会导致交易量增加),如果文件a.log达到定大小后,会被重命名为a.log.1,新建一个a.log,那么此时a.loga.log.1inode一样,文件名不一样,默认的flume taildir会认为这是都是一个新文件,就会重新读一遍,所以导致数据重复;2)、不释放资源,如果a.log.11a.log.12之前被flume监控过,现在把这两个文件删除了,flume不停止,资源不会被释放。

修改思路:文件如果更名后文件没有关闭先关闭原文件的句柄,之后判断文件是否更新,若更新,重新打开文件;没有更新,只修改文件路径

代码修改:taildir 包下 ReliableTaildirEventReader类的ReliableTaildirEventReader方法修改,

原代码为:

if(tf==null||!tf.getPath().equals(f.getAbsolutePath())){

long startPos=skipToEndf.length():0;

tf=openFile(f,headers,inode,startPos);

}else{…}

修改为:

if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {

//自定义代码开始

long startPos;

if(tf!=null){

boolean updated = tf.getLastUpdated() < f.lastModified();

if(tf.getRaf()!=null){//得到文件句柄

tf.close();

}

startPos = tf.getPos();

if(updated){ //更新

tf = openFile(f, headers, inode, startPos);

}else{//没更新

tf.updateFilePath(f.getAbsolutePath());

}

}else{

startPos = skipToEnd f.length() : 0;

tf = openFile(f, headers, inode, startPos);

}

//自定义代码结束

//源代码

} else {

然后将新代码打jar包,放入flume lib文件里,并在配置文件里面调用该source,测试结果,上面两个问题都得到了解决。


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume实现日志文件夹数据加载到HD.. 下一篇Flume案例:实时采集python爬取的..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目