设为首页 加入收藏

TOP

Flume将日志log文件从本地导入kafka,再从kafka导入HDFS--使用kafka作为channel
2019-04-23 14:29:48 】 浏览:66
Tags:Flume 日志 log 文件 本地 导入 kafka 再从 HDFS-- 使用 作为 channel

作为前面两篇文章的整合,并使用kafka作为channel:

1、Flume将日志log文件从本地导入kafka,再从kafka导入HDFS。地址:

https://blog.csdn.net/m0_37890482/article/details/81126522#0-qzone-1-10214-d020d2d2a4e8d1a374a433f596ad1440

2、Flume根据文件中每行开始的日期进行文件夹分类导入HDFS。地址:

https://blog.csdn.net/m0_37890482/article/details/81130616#0-qzone-1-32278-d020d2d2a4e8d1a374a433f596ad1440

的补充,之前是使用memory作为channel,现改用kafka作为channel。配置文件都默认存放在 /etc/flume-ng/conf下

直接看配置文件:kafka-in.conf (这里是从本地获取日志的配置文件,实际上可以不写sink部分,sink写到kafka-out.conf)

#--------文件从本地路径到kafka-in配置文件------#
#--------------Edit by cheengvho-------------#

# 指定Agent的组件名称
agent1.sources = file_source
agent1.sinks = kafka_sink
agent1.channels = kafka_channel

#-------file_source(要监听的路径)---------
agent1.sources.file_source.interceptors = i1
agent1.sources.file_source.interceptors.i1.type = regex_extractor
agent1.sources.file_source.interceptors.i1.regex = ^(:\\n)(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
agent1.sources.file_source.interceptors.i1.serializers = s1
agent1.sources.file_source.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent1.sources.file_source.interceptors.i1.serializers.s1.name = timestamp
agent1.sources.file_source.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

agent1.sources.file_source.type = spooldir
agent1.sources.file_source.spoolDir = /flume/kafka_logs
#agent1.sources.file_source.deletePolicy = immediate

#------------kafka_sink--------------------
#agent1.sinks.kafka_sink.type = logger
agent1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka_sink.topic = access
agent1.sinks.kafka_sink.brokerList = localhost:9092
agent1.sinks.kafka_sink.requiredAcks = 1
agent1.sinks.kafka_sink.batchSize = 1000

#-------------kafka_channel-------------------
agent1.channels.kafka_channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka_channel.brokerList = localhost:9092
agent1.channels.kafka_channel.kafka.bootstrap.servers = localhost:9092
agent1.channels.kafka_channel.zookeeperConnect = localhost:2181
agent1.channels.kafka_channel.kafka.topic = access
agent1.channels.kafka_channel.kafka.consumer.group.id = flume-consumer
agent1.channels.kafka_channel.capacity = 10000
agent1.channels.kafka_channel.transactionCapacity = 1000

#------------------------------------------
agent1.sources.file_source.channels = kafka_channel
agent1.sinks.kafka_sink.channel = kafka_channel

配置文件kafka-out.conf (同上面一样,这里不需要写source部分,source部分写在了kafka-in.conf里面)

注意这里的filePrefix和fileSuffix容易不生效,注意两点即可解决,file后面第一个字母要大写,另外前面要加hdfs

#------------------kafka-out.conf-------------------#
#----------------Edit by cheengvho------------------#
# ------------------ 定义数据流----------------------#
# source的名字
agent2.sources = kafka_source
# channels的名字,建议按照type来命名
agent2.channels = kafka_channel
# sink的名字,建议按照目标来命名
agent2.sinks = hdfs_sink

#-------- kafka_source相关配置-----------------
# 定义消息源类型
# For each one of the sources, the type is defined  
agent2.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent2.sources.kafka_source.channels = kafka_channel
agent2.sources.kafka_source.batchSize = 5000
agent2.sources.kafka_source.zookeeperConnect = localhost:2181
agent2.sources.kafka_source.bootstrap.servers = localhost:9092
agent2.sources.kafka_source.topic = access
agent2.sources.kafka_source.consumer.group.id = flumehdfs

#---------hdfs_sink 相关配置------------------
agent2.sinks.hdfs_sink.type = hdfs
agent2.sinks.hdfs_sink.channel = kafka_channel
agent2.sinks.hdfs_sink.filePrefix = %Y-%m-%d
agent2.sinks.hdfs_sink.filesuffix = .log
agent2.sinks.hdfs_sink.hdfs.path = /loudacre/kafka/%Y%m%d
agent2.sinks.hdfs_sink.hdfs.rollSize = 524288
agent2.sinks.hdfs_sink.hdfs.rollCount = 0
agent2.sinks.hdfs_sink.hdfs.rollInterval = 0
agent2.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent2.sinks.hdfs_sink.hdfs.fileType=DataStream
agent2.sinks.hdfs_sink.hdfs.writeFormat=Text

#------- kafka_channel相关配置-------------------------
agent2.channels.kafka_channel.zookeeperConnect = localhost:2181
agent2.channels.kafka_channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent2.channels.kafka_channel.brokerList = localhost:9092
agent2.channels.kafka_channel.kafka.bootstrap.servers = localhost:9092
agent2.channels.kafka_channel.kafka.topic = access
agent2.channels.kafka_channel.kafka.consumer.group.id = flume-consumer
agent2.channels.kafka_channel.capacity = 100000
agent2.channels.kafka_channel.transactionCapacity = 10000

同之前一样,启动zookeeper、kafka、flume

需要注意的是,这里的启动flume要先使用kafka-out.conf启动,然后另开terminal用kafka-in.conf启动flume。没有报错的话,就可以先看看本地文件是否已经变成了“.COMPLETED”,再看HDFS中的文件是否已经存在。

发现看的人还挺多,在这里做一个特别说明,kafka-in.conf中些sink部分和kafka-out.conf中些source部分都是错误示范,你们拿代码去测试的时候按照我说的只取部分就可以了。

没有解决问题的也可以留言,我偶尔会看。,但是全凭缘分,因为~

缘!妙不可言!

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka和spark都是用scala写的 下一篇Spring Cloud Stream集成kafka

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目