设为首页 加入收藏

TOP

flume读取日志数据写入kafka   然后kafka+storm整合
2019-05-15 14:12:35 】 浏览:293
Tags:flume 读取 日志 数据 写入 kafka   然后 storm 整合

一、flume配置

flume要求1.6以上版本

flume-conf.properties文件配置内容,sinks的输出作为kafka的product

  1. a1.sources=r1
  2. a1.sinks=k1
  3. a1.channels=c1
  4. #Describe/configurethesource
  5. a1.sources.r1.type=exec
  6. a1.sources.r1.command=tail-F/home/airib/work/log.log
  7. #Describethesink
  8. #a1.sinks.k1.type=logger
  9. a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
  10. a1.sinks.k1.topic=test
  11. a1.sinks.k1.brokerList=localhost:9092
  12. a1.sinks.k1.requiredAcks=1
  13. a1.sinks.k1.batchSize=20
  14. #Useachannelwhichbufferseventsinmemory
  15. a1.channels.c1.type=memory
  16. a1.channels.c1.capacity=1000
  17. a1.channels.c1.transactionCapacity=100
  18. #Bindthesourceandsinktothechannel
  19. a1.sources.r1.channels=c1
  20. a1.sinks.k1.channel=c1


flume启动

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console


二 kafka的消费者java源代码

  1. packagecom.hgp.kafka.kafka;
  2. importjava.util.HashMap;
  3. importjava.util.List;
  4. importjava.util.Map;
  5. importjava.util.Properties;
  6. importkafka.consumer.ConsumerConfig;
  7. importkafka.consumer.ConsumerIterator;
  8. importkafka.consumer.KafkaStream;
  9. importkafka.javaapi.consumer.ConsumerConnector;
  10. importkafka.serializer.StringDecoder;
  11. importkafka.utils.VerifiableProperties;
  12. publicclassKafkaConsumer{
  13. privatefinalConsumerConnectorconsumer;
  14. privateKafkaConsumer(){
  15. Propertiesprops=newProperties();