设为首页 加入收藏

TOP

flume学习(四):Flume Interceptors的使用
2018-11-13 16:14:46 】 浏览:71
Tags:flume 学习 Flume Interceptors 使用

对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。

官方上提供的已有的拦截器有:

Timestamp Interceptor

Host Interceptor

Static Interceptor

Regex Filtering Interceptor

Regex Extractor Interceptor


像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。

Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到

Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。

Static Interceptor:可以在event的header中添加自定义的key和value。

Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。

Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分


下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:
public class WriteLog {
	protected static final Log logger = LogFactory.getLog(WriteLog.class);

	/**
	 * @param args
	 * @throws InterruptedException
	 */
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		while (true) {
			logger.info(new Date().getTime());
			logger.info("{\"requestTime\":"
					+ System.currentTimeMillis()
					+ ",\"requestParams\":{\"timestamp\":1405499314238,\"phone\":\"02038824941\",\"cardName\":\"测试商家名称\",\"provinceCode\":\"440000\",\"cityCode\":\"440106\"},\"requestUrl\":\"/reporter-api/reporter/reporter12/init.do\"}");
			Thread.sleep(2000);

		}
	}
}
又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。

接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:
1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息
2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。

修改后的flume.conf如下:
tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1

tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1

tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp

tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactionCapacity=1000
tier1.channels.channel1.keep-alive=30

tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d
tier1.sinks.sink1.hdfs.fileType=DataStream
tier1.sinks.sink1.hdfs.writeFormat=Text
tier1.sinks.sink1.hdfs.rollInterval=0
tier1.sinks.sink1.hdfs.rollSize=10240
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.idleTimeout=60

我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\\{.*\\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。
i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。

然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇log4j与flume整合配置(实现故障转.. 下一篇Flume与Kafka整合完成实时数据处理

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目