设为首页 加入收藏

TOP

flume读取日志数据写入kafka
2018-11-13 16:14:33 】 浏览:181
Tags:flume 读取 日志 数据 写入 kafka

一、flume配置

flume要求1.6以上版本

flume-conf.properties文件配置内容,sinks的输出作为kafka的product

  1. a1.sources=r1
  2. a1.sinks=k1
  3. a1.channels=c1
  4. #Describe/configurethesource
  5. a1.sources.r1.type=exec
  6. a1.sources.r1.command=tail-F/home/airib/work/log.log
  7. #Describethesink
  8. #a1.sinks.k1.type=logger
  9. a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
  10. a1.sinks.k1.topic=test
  11. a1.sinks.k1.brokerList=localhost:9092
  12. a1.sinks.k1.requiredAcks=1
  13. a1.sinks.k1.batchSize=20
  14. #Useachannelwhichbufferseventsinmemory
  15. a1.channels.c1.type=memory
  16. a1.channels.c1.capacity=1000
  17. a1.channels.c1.transactionCapacity=100
  18. #Bindthesourceandsinktothechannel
  19. a1.sources.r1.channels=c1
  20. a1.sinks.k1.channel=c1


flume启动

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console


二 kafka的消费者java源代码

  1. packagecom.hgp.kafka.kafka;
  2. importjava.util.HashMap;
  3. importjava.util.List;
  4. importjava.util.Map;
  5. importjava.util.Properties;
  6. importkafka.consumer.ConsumerConfig;
  7. importkafka.consumer.ConsumerIterator;
  8. importkafka.consumer.KafkaStream;
  9. importkafka.javaapi.consumer.ConsumerConnector;
  10. importkafka.serializer.StringDecoder;
  11. importkafka.utils.VerifiableProperties;
  12. publicclassKafkaConsumer{
  13. privatefinalConsumerConnectorconsumer;
  14. privateKafkaConsumer(){
  15. Propertiesprops=newProperties();
  16. //zookeeper配置
  17. props.put("zookeeper.connect","localhost:2181");
  18. //group代表一个消费组
  19. props.put("group.id","jd-group");
  20. //zk连接超时
  21. props.put("zookeeper.session.timeout.ms","4000");
  22. props.put("zookeeper.sync.time.ms","200");
  23. props.put("auto.commit.interval.ms","1000");
  24. props.put("auto.offset.reset","smallest");
  25. //序列化类
  26. props.put("serializer.class","kafka.serializer.StringEncoder");
  27. ConsumerConfigconfig=newConsumerConfig(props);
  28. consumer=kafka.consumer.Consumer.createJavaConsumerConnector(config);
  29. }
  30. voidconsume(){
  31. Map<String,Integer>topicCountMap=newHashMap<String,Integer>();
  32. topicCountMap.put("test",newInteger(1));
  33. StringDecoderkeyDecoder=newStringDecoder(newVerifiableProperties());
  34. StringDecodervalueDecoder=newStringDecoder(newVerifiableProperties());
  35. Map<String,List<KafkaStream<String,String>>>consumerMap=
  36. consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
  37. KafkaStream<String,String>stream=consumerMap.get("test").get(0);
  38. ConsumerIterator<String,String>it=stream.iterator();
  39. while(it.hasNext())
  40. System.out.println(it.next().message());
  41. }
  42. publicstaticvoidmain(String[]args){
  43. newKafkaConsumer().consume();
  44. }
  45. }

kafka启动命令

启动Zookeeper server:

bin/zookeeper-server-start.shconfig/zookeeper.properties&


启动Kafka server:

bin/kafka-server-start.shconfig/server.properties&


运行producer:

bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest


运行consumer:

bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest--from-beginning

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇2.消息队列 - 采集和计算对接[kaf.. 下一篇【十八掌●武功篇】第十二掌:Flu..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目