一、flume配置
flume要求1.6以上版本
flume-conf.properties文件配置内容,sinks的输出作为kafka的product
-
a1.sources=r1
-
a1.sinks=k1
-
a1.channels=c1
-
-
#Describe/configurethesource
-
a1.sources.r1.type=exec
-
a1.sources.r1.command=tail-F/home/airib/work/log.log
-
-
#Describethesink
-
#a1.sinks.k1.type=logger
-
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
-
a1.sinks.k1.topic=test
-
a1.sinks.k1.brokerList=localhost:9092
-
a1.sinks.k1.requiredAcks=1
-
a1.sinks.k1.batchSize=20
-
-
#Useachannelwhichbufferseventsinmemory
-
a1.channels.c1.type=memory
-
a1.channels.c1.capacity=1000
-
a1.channels.c1.transactionCapacity=100
-
-
#Bindthesourceandsinktothechannel
-
a1.sources.r1.channels=c1
-
a1.sinks.k1.channel=c1
flume启动
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
二 kafka的消费者java源代码
-
packagecom.hgp.kafka.kafka;
-
-
importjava.util.HashMap;
-
importjava.util.List;
-
importjava.util.Map;
-
importjava.util.Properties;
-
-
importkafka.consumer.ConsumerConfig;
-
importkafka.consumer.ConsumerIterator;
-
importkafka.consumer.KafkaStream;
-
importkafka.javaapi.consumer.ConsumerConnector;
-
importkafka.serializer.StringDecoder;
-
importkafka.utils.VerifiableProperties;
-
-
publicclassKafkaConsumer{
-
-
privatefinalConsumerConnectorconsumer;
-
-
privateKafkaConsumer(){
-
Propertiesprops=newProperties();
-
//zookeeper配置
-
props.put("zookeeper.connect","localhost:2181");
-
-
//group代表一个消费组
-
props.put("group.id","jd-group");
-
-
//zk连接超时
-
props.put("zookeeper.session.timeout.ms","4000");
-
props.put("zookeeper.sync.time.ms","200");
-
props.put("auto.commit.interval.ms","1000");
-
props.put("auto.offset.reset","smallest");
-
//序列化类
-
props.put("serializer.class","kafka.serializer.StringEncoder");
-
-
ConsumerConfigconfig=newConsumerConfig(props);
-
-
consumer=kafka.consumer.Consumer.createJavaConsumerConnector(config);
-
}
-
-
voidconsume(){
-
Map<String,Integer>topicCountMap=newHashMap<String,Integer>();
-
topicCountMap.put("test",newInteger(1));
-
-
StringDecoderkeyDecoder=newStringDecoder(newVerifiableProperties());
-
StringDecodervalueDecoder=newStringDecoder(newVerifiableProperties());
-
-
Map<String,List<KafkaStream<String,String>>>consumerMap=
-
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
-
KafkaStream<String,String>stream=consumerMap.get("test").get(0);
-
ConsumerIterator<String,String>it=stream.iterator();
-
while(it.hasNext())
-
System.out.println(it.next().message());
-
}
-
-
publicstaticvoidmain(String[]args){
-
newKafkaConsumer().consume();
-
}
-
}
kafka启动命令
启动Zookeeper server:
bin/zookeeper-server-start.shconfig/zookeeper.properties&
启动Kafka server:
bin/kafka-server-start.shconfig/server.properties&
运行producer:
bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest
运行consumer:
bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest--from-beginning