bsp; agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1
######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true
######## interceptor相关配置 ########
agent.sources.s1.interceptors = i1
agent.sources.s1.interceptors.i1.type = com.flumePlugins.interceptor.ParseLogByRule$Builder
######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100
######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy
启动flume-agent
bin/flume-ng agent --conf conf/ -f conf/agent.conf -Dflume.root.logger=INFO,console -name agent
启动kafka-console-consumer
./kafka-console-consumer --topic testTopic3 --bootstrap-server localhost:9092
写入测试数据到监控目录
#日志数据
log='{
"host":"www.baidu.com",
"user_id":"197878787878787",
"items":[
{
"item_type":"clothes",
"active_time":18989989
},
{
"item_type":"car",
"active_time":18989989
}
]
}'
#日志追加到文件
echo $log>> /Users/wangpei/tempData/flume/data/test.log
查看发送到kafka中的数据
#可以看到一条数据按规则被解析成了两条
{"active_time":18989989,"user_id":"197878787878787","item_type":"clothes","host":"www.baidu.com"}
{"active_time":18989989,"user_id":"197878787878787","item_type":"car","host":"www.baidu.com"}
自定义拦截器要点
A、实现Interceptor接口中intercept(Event event)方法和intercept(List<Event> events)方法。
B、创建内部类Builder实现Interceptor.Builder接口。
C、注意对异常数据的处理。防止Agent奔溃。
???结
通过拦截器确实实现了这一功能,为解决这一类问题提供了一种很好的思路,但逻辑太过复杂反而会降低flume同步效率,生产环境下还要多加验证。