设为首页 加入收藏

TOP

Storm如何保证可靠的消息处理(二)
2015-11-21 01:34:23 来源: 作者: 【 】 浏览:3
Tags:Storm 如何 保证 可靠 消息 处理
添加一个子结点的操作叫做锚定(anchoring)。在应用程序发送一个新元组时候,Storm会在幕后做锚定。还是之前的流式计算单词个数的例子,请看如下的代码片段:
?
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector){
_collector = collector;
}

public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
} 
_collector.ack(tuple);
} 

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
} 
} 

?

每个单词元组是通过把输入的元组作为emit函数中的第一个参数来做锚定的。通过锚定,Storm就能够得到元组之间的关联关系(输入元组触发了新的元组),继而构建出Spout元组触发的整个消息树。所以当下游处理失败时,就可以通知Spout当前消息树根节点的Spout元组处理失败,让Spout重新处理。相反,如果在emit的时候没有指定输入的元组,叫做不锚定:
?
_collector.emit(new Values(word));
?
这样发射单词元组,会导致这个元组不被锚定(unanchored),这样Storm就不能得到这个元组的消息树,继而不能跟踪消息树是否被完整处理。这样下游处理失败,不能通知到上游的Spout任务。不同的应用的有不同的容错处理方式,有时候需要这样不锚定的场景。
?
一个输出的元组可以被锚定到多个输入元组上,叫做多锚定(multi-anchoring)。这在做流的合并或者聚合的时候非常有用。一个多锚定的元组处理失败,会导致Spout上重新处理对应的多个输入元组。多锚定是通过指定一个多个输入元组的列表而不是单个元组来完成的。例如:
List anchors = new ArrayList();
anchors.add(tuple1);  
anchors.add(tuple2);
_collector.emit(anchors, new Values(word));

?

多锚定会把这个新输出的元组添加到多棵消息树上。注意多锚定可能会打破消息的树形结构,变成有向无环图(DAG),Storm的实现既支持树形结构,也支持有向无环图(DAG)。在本文中,提到的消息树跟有向无环图是等价的。消息之间的关系是有向无环图的例子见下图:
?
?
Spout元组A触发了B和C两个元组,而这两个元组作为输入,共同作用后触发D元组。
?
元组处理完后通知Storm
?
锚定的作用就是指定元组树的结构--下一步是当元组树中某个元组已经处理完成时,通知Storm。通知是通过OutputCollector中的ack和fail函数来完成的。例如上面流式计算单词个数例子中的split Bolt的实现SplitSentence类,可以看到句子被切分成单词后,当所有的单词元组都被发射后,会确认(ack)输入的元组处理完成。
?
可以利用OutputCollector的fail函数来立即通知Storm,当前消息树的根元组处理失败了。例如,应用程序可能捕捉到了 数据库客户端的一个异常,就显示地通知Storm输入元组处理失败。通过显示地通知Storm元组处理失败,这个Spout元组就不用等待超时而能更快地被重新处理。
?
Storm需要占用内存来跟踪每个元组,所以每个被处理的元组都必须被确认。因为如果不对每个元组进行确认,任务最终会耗光可用的内存。
?
做聚合或者合并操作的Bolt可能会延迟确认一个元组,直到根据一堆元组计算出了一个结果后,才会确认。聚合或者合并操作的Bolt,通常也会对他们的输出元组进行多锚定。
?
Storm 0.7.0引入了“事务拓扑”(transactional topologies)的特性,它让你在大多数场景下能够得到完全容错的只被处理一次的消息语义。更多关于事物拓扑的介绍见这里
?
Storm怎样高效的实现可靠性?
?
acker任务
?
一个Storm拓扑有一组特殊的"acker"任务,它们负责跟踪由每个Spout元组触发的消息的处理状态。当一个"acker"看到一个Spout元组产生的有向无环图中的消息被完全处理,就通知当初创建这个Spout元组的Spout任务,这个元组被成功处理。可以通过拓扑配置项Config.TOPOLOGY_ACKER_EXECUTORS来设置一个拓扑中acker任务executor的数量。Storm默认TOPOLOGY_ACKER_EXECUTORS和拓扑中配置的Worker的数量相同(关于executor和Worker的介绍,参见理解Storm并发一文)--对于需要处理大量消息的拓扑来说,需要增大acker executor的数量。
?
元组的生命周期
?
理解Storm的可靠性实现方式的最好方法是查看元组的生命周期和元组构成的有向无环图。当拓扑的Spout或者Bolt中创建一个元组时,都会被赋予一个随机的64比特的标识(message id)。acker任务使用这些id来跟踪每个Spout元组产生的有向无环图的处理状态。在Bolt中产生一个新的元组时,会从锚定的一个或多个输入元组中拷贝所有Spout元组的message-id,所以每个元组都携带了自己所在元组树的根节点Spout元组的message-id。当确认一个元组处理成功了,Storm就会给对应的acker任务发送特定的消息--通知acker当前这个Spout元组产生的消息树中某个消息处理完了,而且这个特定消息在消息树中又产生了一个新消息(新消息锚定的输入是这个特定的消息)。
?
举个例子,假设"D"元组和"E"元组是基于“C”元组产生的,那么下图描述了确认“C”元组成功处理后,元组树的变化。图中虚线框表示的元组代表已经在消息树上被删除了:
?
?
由于在“C”从消息树中删除(通过acker函数确认成功处理)的同时,“D”和“E”也被添加到(通过emit函数来锚定的)元组树中,所以这棵树从来不会被提早处理完。
?
正如上面已经提到的,在一个拓扑中,可以有任意数量的acker任务。这导致了如下的两个问题:
?
当拓扑中的一个元组确认被处理完,或者产生一个新的元组时,Storm应该通知哪个acker任务?
通知了acker任务后,acker任务如何通
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇OCP-V13-696 下一篇OCP-V13-681

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: