使用flume+kafka+sparkstreaming进行日志实时处理,flume作为kafka的producer,sparkstreaming作为kafka的消费者。
flume只有1.6.0和以上的版才可以和kafka集成,1.6.0之前的版本没有提供kafka sink这个功能,在kafka中创建一个flumeTopic topic,然后使用flume 监控五个日志文件,五个文件实时更新,flume配置文件如下:
a1.sources = s1 s2 s3 s4 s5
a1.channels = c1
a1.sinks = k1
# define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /nfs0/log/log1.log
a1.sources.s1.shell = /bin/sh -c
a1.sources.s2.type = exec
a1.sources.s2.command = tail -F /nfs0/log/log2.log
a1.sources.s2.shell = /bin/sh -c
a1.sources.s3.type = exec
a1.sources.s3.command = tail -F /nfs0/log/log3.log
a1.sources.s3.shell = /bin/sh -c
a1.sources.s4.type = exec
a1.sources.s4.command = tail -F /nfs0/log/log4.log
a1.sources.s4.shell = /bin/sh -c
a1.sources.s5.type = exec
a1.sources.s5.command = tail -F /nfs0/log/log5.log
a1.sources.s5.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
#define the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = master11:9092,slave11:9092,slave12:9092
a1.sinks.k1.topic = flumeTopic
# zuhe
a1.sources.s1.channels = c1
a1.sources.s2.channels = c1
a1.sources.s3.channels = c1
a1.sources.s4.channels = c1
a1.sources.s5.channels = c1
a1.sinks.k1.channel = c1
启动flume和kafka,此时sparkstreaming就可以实时消费kafka中的数据了