作为前面两篇文章的整合,并使用kafka作为channel:
1、Flume将日志log文件从本地导入kafka,再从kafka导入HDFS。地址:
https://blog.csdn.net/m0_37890482/article/details/81126522#0-qzone-1-10214-d020d2d2a4e8d1a374a433f596ad1440
2、Flume根据文件中每行开始的日期进行文件夹分类导入HDFS。地址:
https://blog.csdn.net/m0_37890482/article/details/81130616#0-qzone-1-32278-d020d2d2a4e8d1a374a433f596ad1440
的补充,之前是使用memory作为channel,现改用kafka作为channel。配置文件都默认存放在 /etc/flume-ng/conf下
直接看配置文件:kafka-in.conf (这里是从本地获取日志的配置文件,实际上可以不写sink部分,sink写到kafka-out.conf)
#--------文件从本地路径到kafka-in配置文件------#
#--------------Edit by cheengvho-------------#
# 指定Agent的组件名称
agent1.sources = file_source
agent1.sinks = kafka_sink
agent1.channels = kafka_channel
#-------file_source(要监听的路径)---------
agent1.sources.file_source.interceptors = i1
agent1.sources.file_source.interceptors.i1.type = regex_extractor
agent1.sources.file_source.interceptors.i1.regex = ^(:\\n)(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
agent1.sources.file_source.interceptors.i1.serializers = s1
agent1.sources.file_source.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent1.sources.file_source.interceptors.i1.serializers.s1.name = timestamp
agent1.sources.file_source.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
agent1.sources.file_source.type = spooldir
agent1.sources.file_source.spoolDir = /flume/kafka_logs
#agent1.sources.file_source.deletePolicy = immediate
#------------kafka_sink--------------------
#agent1.sinks.kafka_sink.type = logger
agent1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka_sink.topic = access
agent1.sinks.kafka_sink.brokerList = localhost:9092
agent1.sinks.kafka_sink.requiredAcks = 1
agent1.sinks.kafka_sink.batchSize = 1000
#-------------kafka_channel-------------------
agent1.channels.kafka_channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka_channel.brokerList = localhost:9092
agent1.channels.kafka_channel.kafka.bootstrap.servers = localhost:9092
agent1.channels.kafka_channel.zookeeperConnect = localhost:2181
agent1.channels.kafka_channel.kafka.topic = access
agent1.channels.kafka_channel.kafka.consumer.group.id = flume-consumer
agent1.channels.kafka_channel.capacity = 10000
agent1.channels.kafka_channel.transactionCapacity = 1000
#------------------------------------------
agent1.sources.file_source.channels = kafka_channel
agent1.sinks.kafka_sink.channel = kafka_channel
配置文件kafka-out.conf (同上面一样,这里不需要写source部分,source部分写在了kafka-in.conf里面)
注意这里的filePrefix和fileSuffix容易不生效,注意两点即可解决,file后面第一个字母要大写,另外前面要加hdfs
#------------------kafka-out.conf-------------------#
#----------------Edit by cheengvho------------------#
# ------------------ 定义数据流----------------------#
# source的名字
agent2.sources = kafka_source
# channels的名字,建议按照type来命名
agent2.channels = kafka_channel
# sink的名字,建议按照目标来命名
agent2.sinks = hdfs_sink
#-------- kafka_source相关配置-----------------
# 定义消息源类型
# For each one of the sources, the type is defined
agent2.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent2.sources.kafka_source.channels = kafka_channel
agent2.sources.kafka_source.batchSize = 5000
agent2.sources.kafka_source.zookeeperConnect = localhost:2181
agent2.sources.kafka_source.bootstrap.servers = localhost:9092
agent2.sources.kafka_source.topic = access
agent2.sources.kafka_source.consumer.group.id = flumehdfs
#---------hdfs_sink 相关配置------------------
agent2.sinks.hdfs_sink.type = hdfs
agent2.sinks.hdfs_sink.channel = kafka_channel
agent2.sinks.hdfs_sink.filePrefix = %Y-%m-%d
agent2.sinks.hdfs_sink.filesuffix = .log
agent2.sinks.hdfs_sink.hdfs.path = /loudacre/kafka/%Y%m%d
agent2.sinks.hdfs_sink.hdfs.rollSize = 524288
agent2.sinks.hdfs_sink.hdfs.rollCount = 0
agent2.sinks.hdfs_sink.hdfs.rollInterval = 0
agent2.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent2.sinks.hdfs_sink.hdfs.fileType=DataStream
agent2.sinks.hdfs_sink.hdfs.writeFormat=Text
#------- kafka_channel相关配置-------------------------
agent2.channels.kafka_channel.zookeeperConnect = localhost:2181
agent2.channels.kafka_channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent2.channels.kafka_channel.brokerList = localhost:9092
agent2.channels.kafka_channel.kafka.bootstrap.servers = localhost:9092
agent2.channels.kafka_channel.kafka.topic = access
agent2.channels.kafka_channel.kafka.consumer.group.id = flume-consumer
agent2.channels.kafka_channel.capacity = 100000
agent2.channels.kafka_channel.transactionCapacity = 10000
同之前一样,启动zookeeper、kafka、flume
需要注意的是,这里的启动flume要先使用kafka-out.conf启动,然后另开terminal用kafka-in.conf启动flume。没有报错的话,就可以先看看本地文件是否已经变成了“.COMPLETED”,再看HDFS中的文件是否已经存在。
发现看的人还挺多,在这里做一个特别说明,kafka-in.conf中些sink部分和kafka-out.conf中些source部分都是错误示范,你们拿代码去测试的时候按照我说的只取部分就可以了。
没有解决问题的也可以留言,我偶尔会看。,但是全凭缘分,因为~
缘!妙不可言!