设为首页 加入收藏

TOP

Flume 自定义拦截器(三)
2018-10-07 10:09:14 】 浏览:290
Tags:Flume 定义 拦截
bsp; agent.sources.s1.channels = c1
    # 指定sink使用的channel
    agent.sinks.r1.channel = c1


    ######## source相关配置 ########
    # source类型
    agent.sources.s1.type = TAILDIR
    # 元数据位置
    agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
    # 监控的目录
    agent.sources.s1.filegroups = f1
    agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
    agent.sources.s1.fileHeader = true


    ######## interceptor相关配置 ########
    agent.sources.s1.interceptors = i1
    agent.sources.s1.interceptors.i1.type = com.flumePlugins.interceptor.ParseLogByRule$Builder


    ######## channel相关配置 ########
    # channel类型
    agent.channels.c1.type = file
    # 数据存放路径
    agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
    # 检查点路径
    agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
    # channel中最多缓存多少
    agent.channels.c1.capacity = 1000
    # channel一次最多吐给sink多少
    agent.channels.c1.transactionCapacity = 100


    ######## sink相关配置 ########
    # sink类型
    agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
    # brokers地址
    agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
    # topic
    agent.sinks.r1.kafka.topic = testTopic3
    # 压缩
    agent.sinks.r1.kafka.producer.compression.type = snappy


启动flume-agent


bin/flume-ng agent --conf conf/ -f conf/agent.conf -Dflume.root.logger=INFO,console -name agent


启动kafka-console-consumer


./kafka-console-consumer --topic testTopic3 --bootstrap-server localhost:9092


写入测试数据到监控目录


#日志数据
log='{
"host":"www.baidu.com",
"user_id":"197878787878787",
"items":[
    {
        "item_type":"clothes",
        "active_time":18989989
    },
    {
        "item_type":"car",
        "active_time":18989989
    }
 ]
}'
#日志追加到文件
echo $log>> /Users/wangpei/tempData/flume/data/test.log


查看发送到kafka中的数据


#可以看到一条数据按规则被解析成了两条
{"active_time":18989989,"user_id":"197878787878787","item_type":"clothes","host":"www.baidu.com"}
{"active_time":18989989,"user_id":"197878787878787","item_type":"car","host":"www.baidu.com"}


自定义拦截器要点


A、实现Interceptor接口中intercept(Event event)方法和intercept(List<Event> events)方法。


B、创建内部类Builder实现Interceptor.Builder接口。


C、注意对异常数据的处理。防止Agent奔溃。


???结


通过拦截器确实实现了这一功能,为解决这一类问题提供了一种很好的思路,但逻辑太过复杂反而会降低flume同步效率,生产环境下还要多加验证。


首页 上一页 1 2 3 下一页 尾页 3/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Elasticsearch Kibana查询语法 下一篇Python基础教程:for 循环语句 与..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目