我们现在用 kafka
和 ems
两种方式来接收外部消息,之前没接收一条消息就 ack,
系统当前消息量大概接近亿级每天,集中在工作时间的八到十个小时。这意味着每个消息都 ack
会消耗大量网络资源,拖慢消息处理速度。因此决定用批量 ack
来降低网络消耗。
实现过程中碰到一个问题,假设没10条
ack 一次,那如果有37条数据,意味着前10条可以成功
ack,而后 7
条由于没有凑够 batchsize
有可能会一直不 ack。
对于 kafkaconsumer
来说,consumer
调用 poll
方法主动从服务器获取消息,这个方法可以接受 timeout
参数。这时即使没有达到 batch size,依然有机会在 timeout
时 act.
而对于 EMS,
就有点麻烦。
EMS client
有两种消息接收方式。一种是 client
主动调用consume
方法从服务器上获取消息。这种方式和 kafka
一样,可以接受timeout
参数,因此不是问题。
问题在于另一种方式,onMessage()它是事件驱动的,不接受
timeout 参数。这样当最后7条消息收到以后它会一直等待后面的消息到来。
这时客户端已经接收并处理完前7条,但还没有机会 ack,
如果客户端这时挂掉,那么这7条处理过的消息再也没机会被 ack。
这会导致消息重复处理。解决办法是开一个守护线程来定时做 ack,由于 EMS ack
依赖于具体消息,因此每次收到一条消息都要 cache
下来,并且几下最后接收到的时间,然后用这最后一条消息来做 ack。
Kafka
和 EMS
一样都支持最后一条消息 ack
时,前面的消息就自动 ack。