内容简介
?
Storm可以保证从Spout发出的每个消息都能被完全处理。Storm的可靠性机制是完全分布式的(distributed),可伸缩的(scalable),容错的(fault-tolerant)。本文介绍了Storm如何保证可靠性以及作为Storm使用者,我们需要怎么做,才能充分利用Storm的可靠性。理解一些实现细节,也能够帮助我们领悟Storm的设计理念。
?
PS:本文用到了Storm的一些基本概念,例如Bolt,任务(Task),元组(Tuple),如果不清楚这些概念,可以参看我之前写的文章:Storm介绍(一),理解Storm并发。下文中元组(Tuple),跟消息(message)是等价的,Storm中处理的消息是用元组这种数据结构来表示的。
?
一个消息被完整处理是什么意思?
?
流式计算单词个数的例子
?
考虑如下的流式计算文章中单词个数的拓扑:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22133, "sentence_queue", new StringScheme()));
builder.setBolt("split", new SplitStentence(), 10).shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20).fieldsGrouping("split", new Fields("word"));
?
这个拓扑由3个处理单元组成:一个叫"sentences"的Spout,负责从Kestrel队列中读取句子并作为新的Spout元组发送出去。名称为"split"的Bolt是Spout元组的下游消费方,它把接收到句子切分成单词并发送出去。名称为"count"的Bolt是"split" Bolt的下游消费方,它使用HashMap存储了每个任务中每个单词出现的次数,每次读取到新的单词元组就让该单词的计数加一。"count" Bolt接收"split" Bolt发出的消息时,是使用元组中的"word"(单词)字段来作为路由策略,所以相同的单词元组会被路由到相同的任务(task)里,这样就能够计数了。
?
消息(元组)树(message tree)
?
在下游的Bolt中会基于某个Spout元组发射出很多新的元组:句子中的每个单词会生成一个新元组(在split Bolt完成),每个单词的计数更新后(在count Bolt完成)也会触发一个新的元组。某个Spout元组触发的消息树如下图:
?
?
可以看到这棵消息树的根节点是Spout产生的句子内容为"the cow jumped over the moon"的元组。这个Spout元组在"split"这个Bolt里被切分为6个单词,触发了6个单词元组,"count" Bolt接收到这6个单词元组后,更新了每个单词的计数并为之产生了一个新的元组。
?
一条消息被“完整处理”
?
一条消息被“完整处理”
?
指一个从Spout发出的元组所触发的消息树中所有的消息都被Storm处理了。如果在指定的超时时间里,这个Spout元组触发的消息树中有任何一个消息没有处理完,就认为这个Spout元组处理失败了。这个超时时间是通过每个拓扑的Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置项来进行配置的,默认是30秒。
?
在前面消息树的例子里,只有消息树中所有的消息(包含一条Spout消息,六条split Bolt消息,六条count Bolt消息)都被Storm处理完了,才算是这条Spout消息被完整处理了。
?
消息被完整处理或者处理失败
?
当消息没有被完整处理或者处理失败了会怎么样?为了理解这个问题,应该首先看一下Spout发出的一个元组的生命周期。Spout需要实现的接口(接口文档见这里)如下:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
?
首先,Storm通过调用Spout的nextTuple函数来从Spout请求一个元组。Spout任务使用open函数入参中提供的SpoutOutputCollector来给Spout任务的某个输出流发射一个新元组。当发射一个元组时,Spout提供了一个"消息标识"(message-id),用来后续识别这个元组。例如,上面的例子里,sentence Spout从Kestrel队列中读取一条消息,然后把Kestrel提供的这个消息的message-id作为"消息标识"来发送出去。向SpoutOutputCollector中发送消息的例子如下:
?
_collector.emit(new Values("the cow jumped over the moon"), msgId);
接下来,元组就被发送到下游的Bolt进行消费,Storm会负责跟踪这个Spout元组创建的消息树。如果Storm检测到一个元组被完整地处理了,Storm会调用产生这个元组的Spout任务(Spout Bolt有多个任务来运行)的ack函数,参数是Spout之前发送这个消息时提供给Storm的message-id。类似的,当元组处理超时或处理失败时,Storm会在元组对应的Spout任务上调用fail函数,参数是之前Spout发送这个消息时提供给Storm的message-id。这样应用程序通过实现Spout Bolt中的ack接口和fail接口来处理消息处理成功和失败的情况。例如当消息处理成功时记录当前处理的进度,当处理失败时,重新发送消息来对这个消息进行重新处理。但在本文的例子里fail函数中不需要做任何处理,因为这些元组不会从Kestrel队列中去掉,下次从队列取消息,仍然会取到这些消息,只有处理成功后,才会从Kestrel队列中摘除这些消息。
?
Storm的可靠性API
?
作为Storm用户,如果想利用Storm的可靠性,需要做两件事:
?
1. 创建一个元组时(消息树上创建一个新节点)需要通知Storm
2. 处理完一个元组,需要通知Storm
通过这两个操作,当消息树被完全处理完,Storm就可以立即检测到,从而可以正确地确认这个Spout元组处理成功或者失败。Storm的API提供了一套简洁地处理这些操作的方法。
?
元组创建时通知Storm
?
在Storm消息树(元组树)中