版权声明:Designed By JiaMingcan https://blog.csdn.net/qq_41571900/article/details/84666854
前期工作
先部署Kafka集群和Flume。
Flume在解压后只需要在flume-env.sh中修改jdk路径就行了。
而Kafka集群相比较下就有点复杂了,步骤如下:
- 解压kafka的压缩包。
- vi /config/server.properties,在其中进行下面4,5,6步
- broker.id=0 ,这个是每台机器的标识,不可重复。
- delete.topic.enable=true,这个是确保删除Topic否则删除时不仅要清除本地数据,还要清除ZK上的数据。
- log.dirs=/opt/module/kafka/logs,配置自己想存储的路径,这里不仅是日志,它也是数据存储的地方。
- zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181,配置ZK集群的ip地址及端口号。
- 配置环境变量,即在/etc/profile下增加PATH。
- 分发一下kafka到集群其他机器上,记得修改broker.id。
- 在每台机器的kafka文件夹下,使用bin/kafka-server-start.sh config/server.properties & 打开kafka服务。
操作流程
在Flume文件夹下,创建一个job文件夹
mkdir job
在job中创建flume-kafka.conf文件,代码操作如下:
vi job/flume-kafka.conf
//文件代码如下
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/test/result.txt //修改为自己读取的文件
# Describe the sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test //读取的文件放在哪个Topic下
a1.sinks.k1.brokerList = 192.168.85.128:9092 //写kafka集群一台broker的ip地址以及端口就行
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
之后,在flume的文件夹下使用
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-kafka.conf
//文件中的a1正是对应了这点的name
我们可以在kafka中启动消费者,相当于将结果打印到控制台上,以验证程序的运行。
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test
这样就完成了flume交互kafka,我们可以使用flume从日志中读取数据,并存放到kafka的Topic下,以便我们使用MapReduce或者SparkStreaming以及Flink来处理数据。
compiled up by JiaMingcan
转载请署名:JiaMingcan