设为首页 加入收藏

TOP

storm, kafka集成之本地开发、测试
2019-01-06 02:32:05 】 浏览:100
Tags:storm kafka 集成 本地 开发 测试

转自:http://blog.csdn.net/xeseo/article/details/18615761 有删改


A. 使用KafkaSpout

一个KafkaSpout只能去处理一个topic的内容,所以,它要求初始化时提供如下与topic相关信息:
  • Kafka集群中的Broker地址 (IP+Port)
有两种方法指定:
1. 使用静态地址,即直接给定Kafka集群中所有Broker信息
[java]view plaincopy
print在CODE上查看代码片派生到我的代码片
  1. GlobalPartitionInformationinfo=newGlobalPartitionInformation();
  2. info.addPartition(0,newBroker("10.1.110.24",9092));
  3. info.addPartition(0,newBroker("10.1.110.21",9092));
  4. BrokerHostsbrokerHosts=newStaticHosts(info);

2. 从Zookeeper动态读取
[java]view plaincopy
print在CODE上查看代码片派生到我的代码片
  1. BrokerHostsbrokerHosts=newZkHosts("10.1.110.24:2181,10.1.110.22:2181");
推荐使用这种方法,因为Kafka的Broker可能会动态的增减

  • topic名字
  • 当前spout的唯一标识Id (以下代称$spout_id)(应该是topic-ID)
  • zookeeper上用于存储当前处理到哪个Offset了 (以下代称$zk_root(Zookeeper中存储offset的ZNode))
  • 当前topic中数据如何解码
了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端自己管理的。所以,后面两个的目的,其实是在zookeeper上建立一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。

在Topology中加入Spout的代码:
[java]view plaincopy
print在CODE上查看代码片派生到我的代码片
  1. Stringtopic="test";
  2. StringzkRoot="kafkastorm";
  3. StringspoutId="myKafka";
  4. SpoutConfigspoutConfig=newSpoutConfig(brokerHosts,topic,zkRoot,spoutId);
  5. spoutConfig.scheme=newSchemeAsMultiScheme(newTestMessageScheme());
  6. TopologyBuilderbuilder=newTopologyBuilder();
  7. builder.setSpout("spout",newKafkaSpout(spoutConfig),spoutNum);

其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据

[java]view plaincopy
print在CODE上查看代码片派生到我的代码片
  1. publicclassTestMessageSchemeimplementsScheme{
  2. privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(TestMessageScheme.class);
  3. @Override
  4. publicList<Object>deserialize(byte[]bytes){
  5. try{
  6. Stringmsg=newString(bytes,"UTF-8");
  7. returnnewValues(msg);
  8. }catch(InvalidProtocolBufferExceptione){
  9. LOGGER.error("Cannotparsetheprovidedmessage!");
  10. }
  11. //TODO:whathappendifreturnsnull
  12. returnnull;
  13. }
  14. @Override
  15. publicFieldsgetOutputFields(){
  16. returnnewFields("msg");
  17. }
  18. }
这个解码方式是与Producer端生成时,写入数据的编码方式配套的。这里我Producer端写入的是String的byte,所以这里也还原成String,定义输出为一个名叫"msg"的field。

后面就可以自己添加Bolt处理tuple中该field的数据了。


B.使用TransactionalTridentKafkaSpout

TransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不同。
[java]view plaincopy
print在CODE上查看代码片派生到我的代码片
  1. TridentKafkaConfigkafkaConfig=newTridentKafkaConfig(brokerHosts,topic,spoutId);
  2. kafkaConfig.scheme=newSchemeAsMultiScheme(newTestMessageScheme());
  3. TransactionalTridentKafkaSpoutkafkaSpout=newTransactionalTridentKafkaSpout(kafkaConfig);
  4. TridentTopologytopology=newTridentTopology();
  5. topology.newStream("test_str",kafkaSpout).shuffle().each(newFields("msg",newPrintFunction());

看到它并没有要求我们提供zkRoot,因为直接代码里面写死了…… -_-T
地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是 /transactional/test_str/myKafaka


C.常见问题

本地模式无法保存Offset
KafkaSpout初始化时,会去取spoutConfig.zkServers 和spoutConfig.zkPort 变量的值,而该值默认是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口。而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。
本地模式,可以显示指定一个持续运行的Zookeeper用于存储当前spout在kafka中消费数据的offset。
[java]view plaincopy
print在CODE上查看代码片派生到我的代码片
  1. spoutConfig.zkServers=newArrayList<String>(){{
  2. add("10.1.110.20");
  3. add("10.1.110.21");
  4. add("10.1.110.24");
  5. }};
  6. spoutConfig.zkPort=2181;
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇新版本 Kafka Consumer 的设计原理 下一篇kafka学习五:开发consumer

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目