设为首页 加入收藏

TOP

log4j结合apache flume输出日志到apache kafka
2018-11-29 03:20:44 】 浏览:122
Tags:log4j 结合 apache flume 输出 日志 kafka
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/minicto/article/details/54025635

flume是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

在最近的项目上,需要统一收集日志,然后将日志发送到kafka,架构流程如下:
image

在每个srv中,通过log4j中flume appender,将日志输出到kafka srv上,在flume srv上配置redource为netcat模式,如下:

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.99.70.51
a1.sources.r1.port = 44444

上面配置了flume srv运行在10.99.70.51的服务器上,并且监听的端口为44444,在log4j中只需要将日志发送到10.99.70.51的44444端口就能成功的发动到flume上。

在flume中,只需要将sink指定到kafka,如下:

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytest
a1.sinks.k1.kafka.bootstrap.servers = 10.99.70.51:9092,10.99.70.52:9092,10.99.70.53:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

其中kafka.bootstrap.servers需要指定kafka的集群地址,kafka.topic需要指定kafka中的topic。

如上配置,当app srv中通过log4j输出一条日志的时候,会通过flume appender发送到10.9.70.51机器44444端口上,flume会监听并收集该端口上的数据信息,然后将他转化成kafka event,并发送到kafka集群mytest topic下。

完整配置,如下

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.99.70.51
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.k1.kafka.topic = mytest
#a1.sinks.k1.kafka.bootstrap.servers = 10.99.70.51:9092,10.99.70.52:9092,10.99.70.53:9092
#a1.sinks.k1.kafka.flumeBatchSize = 20
#a1.sinks.k1.kafka.producer.acks = 1
#a1.sinks.k1.kafka.producer.linger.ms = 1
#a1.sinks.ki.kafka.producer.compression.type = snappy

==手机QQ扫描下方二维码,快速加入Java架构师交流群==

image

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume1.7.0-taildirSource 支持.. 下一篇nginx ---->flume ----->ka..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目