设为首页 加入收藏

TOP

kafka无法收到flume采集的数据的解决办法
2018-11-28 17:55:49 】 浏览:24
Tags:kafka 无法 收到 flume 采集 数据 解决 办法
版权声明:如需转载,请注明出处 https://blog.csdn.net/whdxjbw/article/details/81160350

问题重现

在写黑名单那篇博文的时候,我是通过直接copy log日志文件到监控目录下的方式来模拟数据的,在前几次模拟访问日志文件的时候挺正常的,copy进去基本都是秒采集(文件显示直接加了.COMPLETED后缀)。

但到后来再往采集目录下copy log日志文件的时候,待采集目录下的文件并不会显示被采集(文件没有.COMPLETED后缀),kafka也一直收不到flume采集来的数据。但重启flume客户端后又会正常采集。经过多方排查后,最终定位问题在flume配置文件有错,之前flume配置文件如下:

flume2kafka.sources=r3
flume2kafka.sinks=k3
flume2kafka.channels=c3

# 配置source
flume2kafka.sources.r3.type = spooldir
flume2kafka.sources.r3.channels = c3
flume2kafka.sources.r3.spoolDir = /home/jbw/log/
flume2kafka.sources.r3.fileHeader = true

# 配置channel,将buffer事件放在内存中
flume2kafka.channels.c3.type = memory
flume2kafka.channels.c3.capacity = 100000
flume2kafka.channels.c3.transactionCapacity = 1000

# 配置sink
flume2kafka.sinks.k3.type=org.apache.flume.sink.kafka.KafkaSink
flume2kafka.sinks.k3.brokerList=172.18.9.119:9092,172.18.9.120:9092,172.18.9.121:9092
flume2kafka.sinks.k3.topic=peopleVisitTopic
flume2kafka.sinks.k3.requiredAcks = 0
flume2kafka.sinks.k3.batchSize = 20
#flume2kafka.sinks.k3.channel = memcnl

# 把source和sink绑定在channel上
flume2kafka.sources.r3.channels=c3

解决方案

检查source和channel部分配置都没问题,后来发现是sinks.k3.requiredAcks这一参数导致的问题,之前是0,现在改为1后

配置文件如下,重新运行flume客户端,发现kafka消费端会收到数据了,问题解决。

flume2kafka.sources=r3
flume2kafka.sinks=k3
flume2kafka.channels=c3

# 配置source
flume2kafka.sources.r3.type = spooldir
flume2kafka.sources.r3.spoolDir = /home/jbw/log/
flume2kafka.sources.r3.fileHeader = true

# 配置channel,将buffer事件放在内存中
flume2kafka.channels.c3.type = memory
flume2kafka.channels.c3.capacity = 10000
flume2kafka.channels.c3.transactionCapacity = 1000

# 配置sink
flume2kafka.sinks.k3.type=org.apache.flume.sink.kafka.KafkaSink
flume2kafka.sinks.k3.brokerList=172.18.9.119:9092,172.18.9.120:9092,172.18.9.121:9092
flume2kafka.sinks.k3.topic=peopleVisitTopic
flume2kafka.sinks.k3.serializer.class=kafka.serializer.StringEncoder
flume2kafka.sinks.k3.requiredAcks = 1
flume2kafka.sinks.k3.batchSize = 20

# 把source和sink绑定在channel上
flume2kafka.sources.r3.channels=c3
flume2kafka.sinks.k3.channel=c3

原因分析

我们先看下 request.required.acks 这个参数到底是什么玩意。

它其实是配置 kafka 中的 ack确认机制的参数,kafka 中有三种确认机制,如下:

  • 机制一:producer 端不等待来自 broker 的确认消息,直接发送下一条消息。
  • 机制二:producer 端会在得到 leader broker 确认收到数据消息后,才发送下一条消息。
  • 机制三:producer 端会在得到所有 follower broker 副本确认收到数据消息后,才会发送下一条数据。

现在大家应该就明白requiredAcks 参数的意义了,它就是制定需要多少副本确认收到消息才会被认定为成功写入。不难发现,三种机制,kafka 性能依次递减 (producer吞吐量降低),但数据安全性依次递增。

request.required.acks 意义
0 代表不需要等待任何 broker 的确认,对应于机制一。
1 代表仅需要角色为 leader 的 broker 确认收到消息即可,对应于机制二。
-1 代表需要所有 follower broker 副本都需要确认收到消息才行,对应于机制三。

我们之前遇到的问题,可能是 kafka 的 leader broker 挂掉了,但由于request.required.acks 参数指定的是0,说明 producer 端不需要经过broker端的确认就可以发下一条数据,然而此时我们 broker 已经挂了,你再写就没用了。故在保持性能的情况下,将此参数改为1即可。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume监控目录并读取新添加的文件 下一篇flume 的source 、channel和sink ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目