设为首页 加入收藏

TOP

kafka和flume集成
2019-04-18 14:14:38 】 浏览:63
Tags:kafka flume 集成

使用flume+kafka+sparkstreaming进行日志实时处理,flume作为kafka的producer,sparkstreaming作为kafka的消费者。

flume只有1.6.0和以上的版才可以和kafka集成,1.6.0之前的版本没有提供kafka sink这个功能,在kafka中创建一个flumeTopic topic,然后使用flume 监控五个日志文件,五个文件实时更新,flume配置文件如下

a1.sources = s1 s2 s3 s4 s5
a1.channels = c1
a1.sinks = k1


# define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /nfs0/log/log1.log
a1.sources.s1.shell = /bin/sh -c


a1.sources.s2.type = exec
a1.sources.s2.command = tail -F /nfs0/log/log2.log
a1.sources.s2.shell = /bin/sh -c


a1.sources.s3.type = exec
a1.sources.s3.command = tail -F /nfs0/log/log3.log
a1.sources.s3.shell = /bin/sh -c




a1.sources.s4.type = exec
a1.sources.s4.command = tail -F /nfs0/log/log4.log
a1.sources.s4.shell = /bin/sh -c




a1.sources.s5.type = exec
a1.sources.s5.command = tail -F /nfs0/log/log5.log
a1.sources.s5.shell = /bin/sh -c


#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000


#define the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = master11:9092,slave11:9092,slave12:9092
a1.sinks.k1.topic = flumeTopic


# zuhe
a1.sources.s1.channels = c1
a1.sources.s2.channels = c1
a1.sources.s3.channels = c1
a1.sources.s4.channels = c1
a1.sources.s5.channels = c1

a1.sinks.k1.channel = c1


启动flume和kafka,此时sparkstreaming就可以实时消费kafka中的数据了

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇ICC副本>>>>(logback.. 下一篇Flume的安装与配置(分布式日志收..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目