一、flume配置
flume要求1.6以上版本
flume-conf.properties文件配置内容,sinks的输出作为kafka的product
-
a1.sources=r1
-
a1.sinks=k1
-
a1.channels=c1
-
-
#Describe/configurethesource
-
a1.sources.r1.type=exec
-
a1.sources.r1.command=tail-F/home/airib/work/log.log
-
-
#Describethesink
-
#a1.sinks.k1.type=logger
-
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
-
a1.sinks.k1.topic=test
-
a1.sinks.k1.brokerList=localhost:9092
-
a1.sinks.k1.requiredAcks=1
-
a1.sinks.k1.batchSize=20
-
-
#Useachannelwhichbufferseventsinmemory
-
a1.channels.c1.type=memory
-
a1.channels.c1.capacity=1000
-
a1.channels.c1.transactionCapacity=100
-
-
#Bindthesourceandsinktothechannel
-
a1.sources.r1.channels=c1
-
a1.sinks.k1.channel=c1
flume启动
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
二 kafka的消费者java源代码
-
packagecom.hgp.kafka.kafka;
-
-
importjava.util.HashMap;
-
importjava.util.List;
-
importjava.util.Map;
-
importjava.util.Properties;
-
-
importkafka.consumer.ConsumerConfig;
-
importkafka.consumer.ConsumerIterator;
-
importkafka.consumer.KafkaStream;
-
importkafka.javaapi.consumer.ConsumerConnector;
-
importkafka.serializer.StringDecoder;
-
importkafka.utils.VerifiableProperties;
-
-
publicclassKafkaConsumer{
-
-
privatefinalConsumerConnectorconsumer;
-
-
privateKafkaConsumer(){
-
Propertiesprops=newProperties();