前期收到的问题:
本篇看看storm是通过什么机制来保证消息至少处理一次的语义的,并回答第2个问题。

要说明上面的问题,得先了解storm中的一些原语,比如:
简单的关系如下所示:

上图展示了spout、bolts等形成了一个DAG,如何追踪这个DAG的执行过程,就是storm保证仅处理一次消息的语义的机制所在。

spout在调用emit/emitDirect方法发送tuple时,会以单播或者广播的方式,将消息发送给流的下游的component/task/bolt,如果配置了acker,那么会在每次emit调用之后,向acker发送请求ack的消息:
从上面的代码可以看出,每次emit tuple后,spout会向acker发送一个流ID为ACKER-INIT-STREAM-ID的消息,用于将DAG或者tuple-tree中的节点信息交给acker,acker会利用这个信息来追踪tuple-tree或DAG的完成。
而spout调用emit/emitDirect方法,将tuple发到下游的bolts,也同时会发送用于追踪DAG完成情况的信息:
这个追踪信息是什么呢?

如果是spout -> bolt或者bolt -> bolt,这个信息就是tuple的MessageId,其内部维护一个哈希表:
键为root-id,表示spout,值表示tuple在tuple-tree或者DAG的根(spout)或者经过的边(bolt),但这里没有利用任何常规意义上的“树”的算法,而是采用异或的方式来存储这个值:
如果是spout -> acker,或者bolt -> acker,那么用于追踪的是tuple的values:
下面给出上面调用的bit-xor-vals和bit-xor方法的代码:
说起来有点抽象,看个例子。
假设我们有1个spout,n个bolt,1个acker:
spout发送tuple到下游的bolts:
bolt收到tuple,在execute方法中进行必要的处理,然后调用emit方法,最后调用ack方法:
以上,可以看出bolt进行了emit-ack组合后,其自身在异或链中的作用消失了,也就是说tuple在此bolt得到了处理。
(当然,此时的ack还没有得到acker的确认,假设acker确认了,那么上面所说的tuple在bolt得到了处理就成立了。)
来看看acker的确认。
acker收到来自spout的tuple:
acker收到来自bolt的tuple:
可以看出,bolt_1向acker请求ack,acker收到请求ack,异或之后,id_1的作用消失。也就是说,bolt_1已处理完毕这个tuple。
所以,在acker看来,如果某个bolt的处理完成,则此bolt在异或链中的作用就消失了。
如果所有的bolt 都得到处理,那么acker将会观察到ackVal值变成了0:
如果出现了ackVal = 0,说明两个可能:
如果ackVal不为0,说明tuple-tree或DAG没有完成。如果长时间不为0,通过超时,可以触发一个超时回调,在这个回调中调用spout的fail方法,来进行重放。
如此,就保证了消息处理不会漏掉,但可能会重复。

以上,就是storm保证消息至少处理一次的语义的机制 。