1. 配置flume
kafka-source.properties
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
agent1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.source1.batchSize = 5000
agent1.sources.source1.batchDurationMillis = 2000
agent1.sources.source1.kafka.bootstrap.servers = centos1:9092
agent1.sources.source1.kafka.topics = mytopic
agent1.sources.source1.kafka.consumer.group.id = group1
agent1.sinks.sink1.type=logger
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=1000
agent1.channels.channel1.transactionCapacity=100
agent1.sources.source1.channels=channel1
agent1.sinks.sink1.channel=channel1
2. 启动flume
flume-ng agent -n agent1 -c conf -f conf/kafka-source.properties -Dflume.root.logger=INFO,console
3.启动Kafka producer
kafka-console-producer.sh --broker-list centos1:9092 --topic mytopic
参考
http://flume.apache.org/FlumeUserGuide.html