{"rsdb":{"rid":"309091","subhead":"","postdate":"0","aid":"224253","fid":"119","uid":"1","topic":"1","content":"
\n\t\t\t\t\t\t\t\t\n \t\t\t\t\t\t\t\t \n\t\t\t\t\t\t
\n \r\n

\r\n\u4e00\u3001flume\u914d\u7f6e<\/p>\r\n

\r\nflume\u8981\u6c421.6\u4ee5\u4e0a\u7248\u672c<\/p>\r\n

\r\nflume-conf.properties\u6587\u4ef6\u914d\u7f6e\u5185\u5bb9\uff0csinks\u7684\u8f93\u51fa\u4f5c\u4e3akafka\u7684product
\r\n<\/p>\r\n

\r\n<\/p>\r\n

\r\n
\r\n
\r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
\r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
    \r\n
  1. \r\na1.sources<\/span>=<\/span>r1<\/span><\/span><\/span><\/li>
  2. \r\na1.sinks<\/span>=<\/span>k1<\/span><\/span><\/span><\/li>
  3. \r\na1.channels<\/span>=<\/span>c1<\/span><\/span><\/span><\/li>
  4. \r\n<\/span><\/li>
  5. \r\n#Describe\/configurethesource<\/span><\/li>
  6. \r\na1.sources.r1.type<\/span>=<\/span>exec<\/span><\/span><\/span><\/li>
  7. \r\na1.sources.r1.command<\/span>=<\/span>tail<\/span>-F\/home\/airib\/work\/log.log<\/span><\/span><\/li>
  8. \r\n<\/span><\/li>
  9. \r\n#Describethesink<\/span><\/li>
  10. \r\n#a1.sinks.k1.type<\/span>=<\/span>logger<\/span><\/span><\/span><\/li>
  11. \r\na1.sinks.k1.type<\/span>=<\/span>org<\/span>.apache.flume.sink.kafka.KafkaSink<\/span><\/span><\/li>
  12. \r\na1.sinks.k1.topic<\/span>=<\/span>test<\/span><\/span><\/span><\/li>
  13. \r\na1.sinks.k1.brokerList<\/span>=<\/span>localhost<\/span>:9092<\/span><\/span><\/li>
  14. \r\na1.sinks.k1.requiredAcks<\/span>=<\/span>1<\/span><\/span><\/span><\/li>
  15. \r\na1.sinks.k1.batchSize<\/span>=<\/span>20<\/span><\/span><\/span><\/li>
  16. \r\n<\/span><\/li>
  17. \r\n#Useachannelwhichbufferseventsinmemory<\/span><\/li>
  18. \r\na1.channels.c1.type<\/span>=<\/span>memory<\/span><\/span><\/span><\/li>
  19. \r\na1.channels.c1.capacity<\/span>=<\/span>1000<\/span><\/span><\/span><\/li>
  20. \r\na1.channels.c1.transactionCapacity<\/span>=<\/span>100<\/span><\/span><\/span><\/li>
  21. \r\n<\/span><\/li>
  22. \r\n#Bindthesourceandsinktothechannel<\/span><\/li>
  23. \r\na1.sources.r1.channels<\/span>=<\/span>c1<\/span><\/span><\/span><\/li>
  24. \r\na1.sinks.k1.channel<\/span>=<\/span>c1<\/span><\/span><\/span><\/li><\/ol>\r\n<\/div>\r\n

    \r\n
    \r\n<\/p>\r\n

    \r\nflume\u542f\u52a8<\/p>\r\n

    \r\nbin\/flume-ng agent --conf conf --conf-file conf\/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
    \r\n<\/p>\r\n

    \r\n
    \r\n<\/p>\r\n\u4e8c kafka\u7684\u6d88\u8d39\u8005
    java<\/a>\u6e90\u4ee3\u7801<\/span>\r\n

    \r\n<\/p>\r\n

    \r\n<\/p>\r\n

    \r\n
    \r\n
    \r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
    \r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
      \r\n
    1. \r\npackagecom.hgp.kafka.kafka;<\/span><\/span><\/li>
    2. \r\n<\/span><\/li>
    3. \r\nimportjava<\/a>.util.HashMap;<\/span><\/li>
    4. \r\nimportjava.util.List;<\/span><\/li>
    5. \r\nimportjava.util.Map;<\/span><\/li>
    6. \r\nimportjava.util.Properties;<\/span><\/li>
    7. \r\n<\/span><\/li>
    8. \r\nimportkafka.consumer.ConsumerConfig;<\/span><\/li>
    9. \r\nimportkafka.consumer.ConsumerIterator;<\/span><\/li>
    10. \r\nimportkafka.consumer.KafkaStream;<\/span><\/li>
    11. \r\nimportkafka.javaapi.consumer.ConsumerConnector;<\/span><\/li>
    12. \r\nimportkafka.serializer.StringDecoder;<\/span><\/li>
    13. \r\nimportkafka.utils.VerifiableProperties;<\/span><\/li>
    14. \r\n<\/span><\/li>
    15. \r\npublicclassKafkaConsumer{<\/span><\/li>
    16. \r\n<\/span><\/li>
    17. \r\nprivatefinalConsumerConnectorconsumer;<\/span><\/li>
    18. \r\n<\/span><\/li>
    19. \r\nprivateKafkaConsumer(){<\/span><\/li>
    20. \r\nPropertiesprops<\/span>=<\/span>new<\/span>Properties();<\/span><\/span><\/li>
    21. \r\n\/\/zookeeper\u914d\u7f6e<\/span><\/li>
    22. \r\nprops.put(\"zookeeper.connect\",\"localhost:2181\");<\/span><\/li>
    23. \r\n<\/span><\/li>
    24. \r\n\/\/group\u4ee3\u8868\u4e00\u4e2a\u6d88\u8d39\u7ec4<\/span><\/li>
    25. \r\nprops.put(\"group.id\",\"jd-group\");<\/span><\/li>
    26. \r\n<\/span><\/li>
    27. \r\n\/\/zk\u8fde\u63a5\u8d85\u65f6<\/span><\/li>
    28. \r\nprops.put(\"zookeeper.session.timeout.ms\",\"4000\");<\/span><\/li>
    29. \r\nprops.put(\"zookeeper.sync.time.ms\",\"200\");<\/span><\/li>
    30. \r\nprops.put(\"auto.commit.interval.ms\",\"1000\");<\/span><\/li>
    31. \r\nprops.put(\"auto.offset.reset\",\"smallest\");<\/span><\/li>
    32. \r\n\/\/\u5e8f\u5217\u5316\u7c7b<\/span><\/li>
    33. \r\nprops.put(\"serializer.class\",\"kafka.serializer.StringEncoder\");<\/span><\/li>
    34. \r\n<\/span><\/li>
    35. \r\nConsumerConfigconfig<\/span>=<\/span>new<\/span>ConsumerConfig(props);<\/span><\/span><\/li>
    36. \r\n<\/span><\/li>
    37. \r\nconsumer<\/span>=<\/span>kafka<\/span>.consumer.Consumer.createJavaConsumerConnector(config);<\/span><\/span><\/li>
    38. \r\n}<\/span><\/li>
    39. \r\n<\/span><\/li>
    40. \r\nvoidconsume(){<\/span><\/li>
    41. \r\nMap<<\/span>String<\/span>,Integer<\/span>><\/span><\/span>topicCountMap<\/span>=<\/span>new<\/span>HashMap<\/span><<\/span>String<\/span>,Integer<\/span>><\/span>();<\/span><\/span><\/li>
    42. \r\ntopicCountMap.put(\"test\",newInteger(1));<\/span><\/li>
    43. \r\n<\/span><\/li>
    44. \r\nStringDecoderkeyDecoder<\/span>=<\/span>new<\/span>StringDecoder(newVerifiableProperties());<\/span><\/span><\/li>
    45. \r\nStringDecodervalueDecoder<\/span>=<\/span>new<\/span>StringDecoder(newVerifiableProperties());<\/span><\/span><\/li>
    46. \r\n<\/span><\/li>
    47. \r\nMap<<\/span>String<\/span>,List<\/span><<\/span>KafkaStream<\/span><<\/span>String<\/span>,String<\/span>><\/span>><\/span>><\/span><\/span>consumerMap<\/span>=<\/span><\/span><\/li>
    48. \r\nconsumer<\/span>.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);<\/span><\/span><\/li>
    49. \r\nKafkaStream<<\/span>String<\/span>,String<\/span>><\/span><\/span>stream<\/span>=<\/span>consumerMap<\/span>.get(\"test\").get(0);<\/span><\/span><\/li>
    50. \r\nConsumerIterator<<\/span>String<\/span>,String<\/span>><\/span><\/span>it<\/span>=<\/span>stream<\/span>.iterator();<\/span><\/span><\/li>
    51. \r\nwhile(it.hasNext())<\/span><\/li>
    52. \r\nSystem.out.println(it.next().message());<\/span><\/li>
    53. \r\n}<\/span><\/li>
    54. \r\n<\/span><\/li>
    55. \r\npublicstaticvoidmain(String[]args){<\/span><\/li>
    56. \r\nnewKafkaConsumer().consume();<\/span><\/li>
    57. \r\n}<\/span><\/li>
    58. \r\n}<\/span><\/li><\/ol>\r\n<\/div>\r\n
      \r\n

      \r\nkafka\u542f\u52a8\u547d\u4ee4<\/p>\r\n

      \r\n\u542f\u52a8Zookeeper server\uff1a<\/span>
      \r\n<\/p>\r\n

      \r\nbin\/zookeeper-server-start.shconfig\/zookeeper.properties&<\/span>
      \r\n<\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/p>\r\n

      \r\n\u542f\u52a8Kafka server:<\/span><\/p>\r\n

      \r\nbin\/kafka-server-start.shconfig\/server.properties&<\/span>
      \r\n<\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n\u8fd0\u884cproducer\uff1a<\/span><\/p>\r\n

      \r\nbin\/kafka-console-producer.sh--broker-listlocalhost:<\/span>9092<\/span>--topictest<\/span>
      \r\n<\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n\u8fd0\u884cconsumer\uff1a<\/span><\/p>\r\n

      \r\nbin\/kafka-console-consumer.sh--zookeeperlocalhost:<\/span>2181<\/span>--topictest--from-beginning<\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n
      \r\n<\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n<\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/span><\/p>\r\n

      \r\n\u4e8c\u3001\u793a\u4f8b<\/p>\r\n

      \r\n<\/p>\r\n

      \r\n
      \r\n
      \r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
      \r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
        \r\n
      1. \r\npackagecom.hgp.kafka.kafka;<\/span><\/span><\/li>
      2. \r\n<\/span><\/li>
      3. \r\nimportjava.util.Arrays;<\/span><\/li>
      4. \r\nimportjava.util.HashMap;<\/span><\/li>
      5. \r\nimportjava.util.Iterator;<\/span><\/li>
      6. \r\nimportjava.util.Map;<\/span><\/li>
      7. \r\nimportjava.util.Map.Entry;<\/span><\/li>
      8. \r\nimportjava.util.concurrent.atomic.AtomicInteger;<\/span><\/li>
      9. \r\n<\/span><\/li>
      10. \r\nimportorg.apache.commons.logging.Log;<\/span><\/li>
      11. \r\nimportorg.apache.commons.logging.LogFactory;<\/span><\/li>
      12. \r\n<\/span><\/li>
      13. \r\nimportstorm.kafka.BrokerHosts;<\/span><\/li>
      14. \r\nimportstorm.kafka.KafkaSpout;<\/span><\/li>
      15. \r\nimportstorm.kafka.SpoutConfig;<\/span><\/li>
      16. \r\nimportstorm.kafka.StringScheme;<\/span><\/li>
      17. \r\nimportstorm.kafka.ZkHosts;<\/span><\/li>
      18. \r\nimportbacktype.storm.Config;<\/span><\/li>
      19. \r\nimportbacktype.storm.LocalCluster;<\/span><\/li>
      20. \r\nimportbacktype.storm.StormSubmitter;<\/span><\/li>
      21. \r\nimportbacktype.storm.generated.AlreadyAliveException;<\/span><\/li>
      22. \r\nimportbacktype.storm.generated.InvalidTopologyException;<\/span><\/li>
      23. \r\nimportbacktype.storm.spout.SchemeAsMultiScheme;<\/span><\/li>
      24. \r\nimportbacktype.storm.task.OutputCollector;<\/span><\/li>
      25. \r\nimportbacktype.storm.task.TopologyContext;<\/span><\/li>
      26. \r\nimportbacktype.storm.topology.OutputFieldsDeclarer;<\/span><\/li>
      27. \r\nimportbacktype.storm.topology.TopologyBuilder;<\/span><\/li>
      28. \r\nimportbacktype.storm.topology.base.BaseRichBolt;<\/span><\/li>
      29. \r\nimportbacktype.storm.tuple.Fields;<\/span><\/li>
      30. \r\nimportbacktype.storm.tuple.Tuple;<\/span><\/li>
      31. \r\nimportbacktype.storm.tuple.Values;<\/span><\/li>
      32. \r\n<\/span><\/li>
      33. \r\npublicclassMyKafkaTopology{<\/span><\/li>
      34. \r\n<\/span><\/li>
      35. \r\npublicstaticclassKafkaWordSplitterextendsBaseRichBolt{<\/span><\/li>
      36. \r\n<\/span><\/li>
      37. \r\nprivatestaticfinalLogLOG<\/span>=<\/span>LogFactory<\/span>.getLog(KafkaWordSplitter.class);<\/span><\/span><\/li>
      38. \r\nprivatestaticfinallongserialVersionUID<\/span>=<\/span>886149197481637894L<\/span>;<\/span><\/span><\/li>
      39. \r\nprivateOutputCollectorcollector;<\/span><\/li>
      40. \r\n<\/span><\/li>
      41. \r\n<\/span><\/li>
      42. \r\npublicvoidprepare(MapstormConf,TopologyContextcontext,<\/span><\/li>
      43. \r\nOutputCollectorcollector){<\/span><\/li>
      44. \r\nthis.collector<\/span>=<\/span>collector<\/span>;<\/span><\/span><\/li>
      45. \r\n}<\/span><\/li>
      46. \r\n<\/span><\/li>
      47. \r\n<\/span><\/li>
      48. \r\npublicvoidexecute(Tupleinput){<\/span><\/li>
      49. \r\nStringline<\/span>=<\/span>input<\/span>.getString(0);<\/span><\/span><\/li>
      50. \r\nLOG.info(\"RECV[kafka-><\/span>splitter]\"+line);<\/span><\/span><\/li>
      51. \r\nString[]words<\/span>=<\/span>line<\/span>.split(\"\\\\s+\");<\/span><\/span><\/li>
      52. \r\nfor(Stringword:words){<\/span><\/li>
      53. \r\nLOG.info(\"EMIT[splitter-><\/span>counter]\"+word);<\/span><\/span><\/li>
      54. \r\ncollector.emit(input,newValues(word,1));<\/span><\/li>
      55. \r\n}<\/span><\/li>
      56. \r\ncollector.ack(input);<\/span><\/li>
      57. \r\n}<\/span><\/li>
      58. \r\n<\/span><\/li>
      59. \r\n<\/span><\/li>
      60. \r\npublicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){<\/span><\/li>
      61. \r\ndeclarer.declare(newFields(\"word\",\"count\"));<\/span><\/li>
      62. \r\n}<\/span><\/li>
      63. \r\n<\/span><\/li>
      64. \r\n}<\/span><\/li>
      65. \r\n<\/span><\/li>
      66. \r\npublicstaticclassWordCounterextendsBaseRichBolt{<\/span><\/li>
      67. \r\n<\/span><\/li>
      68. \r\nprivatestaticfinalLogLOG<\/span>=<\/span>LogFactory<\/span>.getLog(WordCounter.class);<\/span><\/span><\/li>
      69. \r\nprivatestaticfinallongserialVersionUID<\/span>=<\/span>886149197481637894L<\/span>;<\/span><\/span><\/li>
      70. \r\nprivateOutputCollectorcollector;<\/span><\/li>
      71. \r\nprivateMap<<\/span>String<\/span>,AtomicInteger<\/span>><\/span>counterMap;<\/span><\/span><\/li>
      72. \r\n<\/span><\/li>
      73. \r\n<\/span><\/li>
      74. \r\npublicvoidprepare(MapstormConf,TopologyContextcontext,<\/span><\/li>
      75. \r\nOutputCollectorcollector){<\/span><\/li>
      76. \r\nthis.collector<\/span>=<\/span>collector<\/span>;<\/span><\/span><\/li>
      77. \r\nthis.counterMap<\/span>=<\/span>new<\/span>HashMap<\/span><<\/span>String<\/span>,AtomicInteger<\/span>><\/span>();<\/span><\/span><\/li>
      78. \r\n}<\/span><\/li>
      79. \r\n<\/span><\/li>
      80. \r\n<\/span><\/li>
      81. \r\npublicvoidexecute(Tupleinput){<\/span><\/li>
      82. \r\nStringword<\/span>=<\/span>input<\/span>.getString(0);<\/span><\/span><\/li>
      83. \r\nintcount<\/span>=<\/span>input<\/span>.getInteger(1);<\/span><\/span><\/li>
      84. \r\nLOG.info(\"RECV[splitter-><\/span>counter]\"+word+\":\"+count);<\/span><\/span><\/li>
      85. \r\nAtomicIntegerai<\/span>=<\/span>this<\/span>.counterMap.get(word);<\/span><\/span><\/li>
      86. \r\nif(ai<\/span>==null){<\/span><\/span><\/li>
      87. \r\nai<\/span>=<\/span>new<\/span>AtomicInteger();<\/span><\/span><\/li>
      88. \r\nthis.counterMap.put(word,ai);<\/span><\/li>
      89. \r\n}<\/span><\/li>
      90. \r\nai.addAndGet(count);<\/span><\/li>
      91. \r\ncollector.ack(input);<\/span><\/li>
      92. \r\nLOG.info(\"CHECKstatisticsmap:\"+this.counterMap);<\/span><\/li>
      93. \r\n}<\/span><\/li>
      94. \r\n<\/span><\/li>
      95. \r\n<\/span><\/li>
      96. \r\npublicvoidcleanup(){<\/span><\/li>
      97. \r\nLOG.info(\"Thefinalresult:\");<\/span><\/li>
      98. \r\nIterator<<\/span>Entry<\/span><<\/span>String<\/span>,AtomicInteger<\/span>><\/span>><\/span><\/span>iter<\/span>=<\/span>this<\/span>.counterMap.entrySet().iterator();<\/span><\/span><\/li>
      99. \r\nwhile(iter.hasNext()){<\/span><\/li>
      100. \r\nEntry<<\/span>String<\/span>,AtomicInteger<\/span>><\/span><\/span>entry<\/span>=<\/span>iter<\/span>.next();<\/span><\/span><\/li>
      101. \r\nLOG.info(entry.getKey()+\"\\t:\\t\"+entry.getValue().get());<\/span><\/li>
      102. \r\n}<\/span><\/li>
      103. \r\n<\/span><\/li>
      104. \r\n}<\/span><\/li>
      105. \r\n<\/span><\/li>
      106. \r\n<\/span><\/li>
      107. \r\npublicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){<\/span><\/li>
      108. \r\ndeclarer.declare(newFields(\"word\",\"count\"));<\/span><\/li>
      109. \r\n}<\/span><\/li>
      110. \r\n}<\/span><\/li>
      111. \r\n<\/span><\/li>
      112. \r\npublicstaticvoidmain(String[]args)throwsAlreadyAliveException,InvalidTopologyException,InterruptedException{<\/span><\/li>
      113. \r\nStringzks<\/span>=<\/span>\"localhost:2181\"<\/span>;<\/span><\/span><\/li>
      114. \r\nStringtopic<\/span>=<\/span>\"test\"<\/span>;<\/span><\/span><\/li>
      115. \r\nStringzkRoot<\/span>=<\/span>\"\/storm\"<\/span>;\/\/defaultzookeeperrootconfigurationforstorm<\/span><\/span><\/li>
      116. \r\nStringid<\/span>=<\/span>\"word\"<\/span>;<\/span><\/span><\/li>
      117. \r\n<\/span><\/li>
      118. \r\nBrokerHostsbrokerHosts<\/span>=<\/span>new<\/span>ZkHosts(zks);<\/span><\/span><\/li>
      119. \r\nSpoutConfigspoutConf<\/span>=<\/span>new<\/span>SpoutConfig(brokerHosts,topic,zkRoot,id);<\/span><\/span><\/li>
      120. \r\nspoutConf.scheme<\/span>=<\/span>new<\/span>SchemeAsMultiScheme(newStringScheme());<\/span><\/span><\/li>
      121. \r\nspoutConf.forceFromStart<\/span>=<\/span>true<\/span>;<\/span><\/span><\/li>
      122. \r\nspoutConf.zkServers<\/span>=<\/span>Arrays<\/span>.asList(newString[]{\"localhost\"});<\/span><\/span><\/li>
      123. \r\nspoutConf.zkPort<\/span>=<\/span>2181<\/span>;<\/span><\/span><\/li>
      124. \r\n<\/span><\/li>
      125. \r\nTopologyBuilderbuilder<\/span>=<\/span>new<\/span>TopologyBuilder();<\/span><\/span><\/li>
      126. \r\nbuilder.setSpout(\"kafka-reader\",newKafkaSpout(spoutConf),5);\/\/Kafka\u6211\u4eec\u521b\u5efa\u4e86\u4e00\u4e2a5\u5206\u533a\u7684Topic\uff0c\u8fd9\u91cc\u5e76\u884c\u5ea6\u8bbe\u7f6e\u4e3a5<\/span><\/li>
      127. \r\nbuilder.setBolt(\"word-splitter\",newKafkaWordSplitter(),2).shuffleGrouping(\"kafka-reader\");<\/span><\/li>
      128. \r\nbuilder.setBolt(\"word-counter\",newWordCounter()).fieldsGrouping(\"word-splitter\",newFields(\"word\"));<\/span><\/li>
      129. \r\n<\/span><\/li>
      130. \r\nConfigconf<\/span>=<\/span>new<\/span>Config();<\/span><\/span><\/li>
      131. \r\n<\/span><\/li>
      132. \r\nStringname<\/span>=<\/span>MyKafkaTopology<\/span>.class.getSimpleName();<\/span><\/span><\/li>
      133. \r\nif(args!=null&&args.length><\/span>0){<\/span><\/span><\/li>
      134. \r\n\/\/Nimbushostnamepassedfromcommandline<\/span><\/li>
      135. \r\nconf.put(Config.NIMBUS_HOST,args[0]);<\/span><\/li>
      136. \r\nconf.setNumWorkers(3);<\/span><\/li>
      137. \r\nStormSubmitter.submitTopologyWithProgressBar(name,conf,builder.createTopology());<\/span><\/li>
      138. \r\n}else{<\/span><\/li>
      139. \r\nconf.setMaxTaskParallelism(3);<\/span><\/li>
      140. \r\nLocalClustercluster<\/span>=<\/span>new<\/span>LocalCluster();<\/span><\/span><\/li>
      141. \r\ncluster.submitTopology(name,conf,builder.createTopology());<\/span><\/li>
      142. \r\nThread.sleep(60000);<\/span><\/li>
      143. \r\ncluster.shutdown();<\/span><\/li>
      144. \r\n}<\/span><\/li>
      145. \r\n}<\/span><\/li>
      146. \r\n}<\/span><\/li><\/ol>\r\n<\/div>\r\n
        \r\n
        \r\n

        \r\n<\/p>\r\n

        \r\npom.xml\u4ee3\u7801<\/p>\r\n

        \r\n<\/p>\r\n

        \r\n
        \r\n
        \r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
        \r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
          \r\n
        1. \r\n<<\/span>project<\/span><\/span>xmlns<\/span>=<\/span>\"http:\/\/maven.apache.org\/POM\/4.0.0\"<\/span><\/span>xmlns:xsi<\/span>=<\/span>\"http:\/\/www.w3.org\/2001\/XMLSchema-instance\"<\/span><\/span><\/span><\/li>
        2. \r\nxsi:schemaLocation<\/span>=<\/span>\"http:\/\/maven.apache.org\/POM\/4.0.0http:\/\/maven.apache.org\/xsd\/maven-4.0.0.xsd\"<\/span>><\/span><\/span><\/span><\/li>
        3. \r\n<<\/span>modelVersion<\/span>><\/span>4.0.0<\/span><\/<\/span>modelVersion<\/span>><\/span><\/span><\/span><\/li>
        4. \r\n<\/span><\/li>
        5. \r\n<\/span><\/li>
        6. \r\n<<\/span>groupId<\/span>><\/span>com.ymm<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        7. \r\n<<\/span>artifactId<\/span>><\/span>TestStorm<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        8. \r\n<<\/span>version<\/span>><\/span>0.0.1-SNAPSHOT<\/span><\/<\/span>version<\/span>><\/span><\/span><\/span><\/li>
        9. \r\n<<\/span>packaging<\/span>><\/span>jar<\/span><\/<\/span>packaging<\/span>><\/span><\/span><\/span><\/li>
        10. \r\n<\/span><\/li>
        11. \r\n<\/span><\/li>
        12. \r\n<<\/span>name<\/span>><\/span>TestStorm<\/span><\/<\/span>name<\/span>><\/span><\/span><\/span><\/li>
        13. \r\n<<\/span>url<\/span>><\/span>http:\/\/maven.apache.org<\/span><\/<\/span>url<\/span>><\/span><\/span><\/span><\/li>
        14. \r\n<\/span><\/li>
        15. \r\n<\/span><\/li>
        16. \r\n<<\/span>properties<\/span>><\/span><\/span><\/span><\/li>
        17. \r\n<<\/span>project.build.sourceEncoding<\/span>><\/span>UTF-8<\/span><\/<\/span>project.build.sourceEncoding<\/span>><\/span><\/span><\/span><\/li>
        18. \r\n<\/<\/span>properties<\/span>><\/span><\/span><\/span><\/li>
        19. \r\n<\/span><\/li>
        20. \r\n<\/span><\/li>
        21. \r\n<<\/span>dependencies<\/span>><\/span><\/span><\/span><\/li>
        22. \r\n<<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        23. \r\n<<\/span>groupId<\/span>><\/span>junit<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        24. \r\n<<\/span>artifactId<\/span>><\/span>junit<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        25. \r\n<<\/span>version<\/span>><\/span>3.8.1<\/span><\/<\/span>version<\/span>><\/span><\/span><\/span><\/li>
        26. \r\n<<\/span>scope<\/span>><\/span>test<\/span><\/<\/span>scope<\/span>><\/span><\/span><\/span><\/li>
        27. \r\n<\/<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        28. \r\n<<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        29. \r\n<<\/span>groupId<\/span>><\/span>org.apache.storm<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        30. \r\n<<\/span>artifactId<\/span>><\/span>storm-core<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        31. \r\n<<\/span>version<\/span>><\/span>0.10.0<\/span><\/<\/span>version<\/span>><\/span><\/span><\/span><\/li>
        32. \r\n<<\/span>scope<\/span>><\/span>provided<\/span><\/<\/span>scope<\/span>><\/span><\/span><\/span><\/li>
        33. \r\n<\/<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        34. \r\n<\/span><\/li>
        35. \r\n<\/span><\/li>
        36. \r\n<<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        37. \r\n<<\/span>groupId<\/span>><\/span>org.apache.storm<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        38. \r\n<<\/span>artifactId<\/span>><\/span>storm-kafka<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        39. \r\n<<\/span>version<\/span>><\/span>0.10.0<\/span><\/<\/span>version<\/span>><\/span><\/span><\/span><\/li>
        40. \r\n<\/<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        41. \r\n<\/span><\/li>
        42. \r\n<\/span><\/li>
        43. \r\n<<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        44. \r\n<<\/span>groupId<\/span>><\/span>org.apache.kafka<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        45. \r\n<<\/span>artifactId<\/span>><\/span>kafka_2.9.2<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        46. \r\n<<\/span>version<\/span>><\/span>0.8.1.1<\/span><\/<\/span>version<\/span>><\/span><\/span><\/span><\/li>
        47. \r\n<<\/span>exclusions<\/span>><\/span><\/span><\/span><\/li>
        48. \r\n<<\/span>exclusion<\/span>><\/span><\/span><\/span><\/li>
        49. \r\n<<\/span>groupId<\/span>><\/span>org.apache.zookeeper<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        50. \r\n<<\/span>artifactId<\/span>><\/span>zookeeper<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        51. \r\n<\/<\/span>exclusion<\/span>><\/span><\/span><\/span><\/li>
        52. \r\n<<\/span>exclusion<\/span>><\/span><\/span><\/span><\/li>
        53. \r\n<<\/span>groupId<\/span>><\/span>log4j<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        54. \r\n<<\/span>artifactId<\/span>><\/span>log4j<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        55. \r\n<\/<\/span>exclusion<\/span>><\/span><\/span><\/span><\/li>
        56. \r\n<\/<\/span>exclusions<\/span>><\/span><\/span><\/span><\/li>
        57. \r\n<\/span><\/li>
        58. \r\n<\/span><\/li>
        59. \r\n<\/<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        60. \r\n<\/span><\/li>
        61. \r\n<\/span><\/li>
        62. \r\n<<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        63. \r\n<<\/span>groupId<\/span>><\/span>commons-logging<\/span><\/<\/span>groupId<\/span>><\/span><\/span><\/span><\/li>
        64. \r\n<<\/span>artifactId<\/span>><\/span>commons-logging<\/span><\/<\/span>artifactId<\/span>><\/span><\/span><\/span><\/li>
        65. \r\n<<\/span>version<\/span>><\/span>1.1.1<\/span><\/<\/span>version<\/span>><\/span><\/span><\/span><\/li>
        66. \r\n<\/<\/span>dependency<\/span>><\/span><\/span><\/span><\/li>
        67. \r\n<\/span><\/li>
        68. \r\n<\/<\/span>dependencies<\/span>><\/span><\/span><\/span><\/li>
        69. \r\n<\/<\/span>project<\/span>><\/span><\/span><\/span><\/li><\/ol>\r\n<\/div>\r\n
          \r\n
          \r\n
          \r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
          \r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
            \r\n
          1. \r\n<\/span><\/span><\/li><\/ol>\r\n<\/div>\r\n
            \r\n
            \r\n
            \r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
            \r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
              \r\n
            1. \r\n<\/span><\/span><\/li><\/ol>\r\n<\/div>\r\n
              \r\n
              \r\n
              \r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
              \r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
                \r\n
              1. \r\n\u4e09\u3001storm\u90e8\u7f72<\/span><\/span><\/li><\/ol>\r\n<\/div>\r\n
                \r\n
                \r\n
                \r\n[html]<\/span>view\r\n plain<\/a>copy<\/a><\/span>\r\n
                \r\n<\/div>\r\n<\/span><\/div>\r\n<\/div>\r\n
                  \r\n
                1. \r\n<<\/span>p<\/span>><\/span><<\/span>span<\/span><\/span>style<\/span>=<\/span>\"color:rgb(85,85,85);font-family:Consolas,'BitstreamVerasansMono','Couriernew',Courier,monospace;font-size:14px;line-height:15.3906px;white-space:pre;\"<\/span>><\/span>1\uff09\u6253jar\u5305mvncleanpackage<\/span><\/<\/span>span<\/span>><\/span><\/span><\/span><\/li>
                2. \r\n<\/<\/span>p<\/span>><\/span><<\/span>p<\/span>><\/span><<\/span>span<\/span><\/span>style<\/span>=<\/span>\"color:rgb(85,85,85);font-family:Consolas,'BitstreamVerasansMono','Couriernew',Courier,monospace;font-size:14px;line-height:15.3906px;text-indent:28px;white-space:pre;\"<\/span>><\/span>2\uff09\u4e0a\u4f20storm\u96c6\u7fa4stormjarxxx.jarcom.sss.class<\/span><\/<\/span>span<\/span>><\/span><\/<\/span>p<\/span>><\/span><\/span><\/span><\/li><\/ol>\r\n<\/div>\r\n


                  \r\n<\/p>\r\n


                  \r\n<\/p>\r\n


                  \r\n<\/p>\r\n


                  \r\n<\/p>\r\n


                  \r\n<\/p>\r\n

                  <\/p>\r\n

                  \r\n1. ZooKeeper<\/h1>\r\n

                  <\/p>\r\n

                  \r\n\u5b89\u88c5\u53c2\u8003<\/a><\/p>\r\n

                  \r\n<\/a>2. Kafka<\/h1>\r\n

                  \r\n<\/a>2.1 \u89e3\u538b\u5b89\u88c5<\/h2>\r\n
                  # \u786e\u4fddscala\u5df2\u7ecf\u5b89\u88c5\u597d\uff0c\u672c\u6587\u5b89\u88c5\u7684\u662f2.11.7<\/span>\ntar -xf kafka_2.11<\/span>-0.9<\/span>.0.1<\/span>.tgz\ncd<\/span> kafka_2.11<\/span>-0.9<\/span>.0.1<\/span>\nmkdir logs\n\nvim ~\/.bash_profile\n\nexport<\/span> KAFKA_HOME=\/home\/zkpk\/kafka_2.11<\/span>-0.9<\/span>.0.1<\/span>\nexport<\/span> PATH=$PATH<\/span>:$KAFKA_HOME<\/span>\/bin\n\nsource<\/span> ~\/.bash_profile<\/code><\/pre>