y Key,而Value可以理解为一行记录。可以认为KTable中的数据都是通过Update only的方式进入的。也就意味着,如果KTable对应的Topic中新进入的数据的Key已经存在,那么从KTable只会取出同一Key对应的最后一条数据,相当于新的数据更新了旧的数据。
以下图为例,假设有一个KStream和KTable,基于同一个Topic创建,并且该Topic中包含如下图所示5条数据。此时遍历KStream将得到与Topic内数据完全一样的所有5条数据,且顺序不变。而此时遍历KTable时,因为这5条记录中有3个不同的Key,所以将得到3条记录,每个Key对应最新的值,并且这三条数据之间的顺序与原来在Topic中的顺序保持一致。这一点与Kafka的日志compact相同。
此时如果对该KStream和KTable分别基于key做Group,对Value进行Sum,得到的结果将会不同。对KStream的计算结果是<Jack,4>
,<Lily,7>
,<Mike,4>
。而对Ktable的计算结果是<Mike,4>
,<Jack,3>
,<Lily,5>
。
2.5 State store
流式处理中,部分操作是无状态的,例如过滤操作(Kafka Stream DSL中用filer
方法实现)。而部分操作是有状态的,需要记录中间状态,如Window操作和聚合计算。State store被用来存储中间状态。它可以是一个持久化的Key-Value存储,也可以是内存中的HashMap,或者是数据库。Kafka提供了基于Topic的状态存储。
Topic中存储的数据记录本身是Key-Value形式的,同时Kafka的log compaction机制可对历史数据做compact操作,保留每个Key对应的最后一个Value,从而在保证Key不丢失的前提下,减少总数据量,从而提高查询效率。
构造KTable时,需要指定其state store name。默认情况下,该名字也即用于存储该KTable的状态的Topic的名字,遍历KTable的过程,实际就是遍历它对应的state store,或者说遍历Topic的所有key,并取每个Key最新值的过程。为了使得该过程更加高效,默认情况下会对该Topic进行compact操作。
另外,除了KTable,所有状态计算,都需要指定state store name,从而记录中间状态。
3 Kafka Stream如何解决流式系统中关键问题
3.1 时间
在流式数据处理中,时间是数据的一个非常重要的属性。从Kafka 0.10开始,每条记录除了Key和Value外,还增加了timestamp
属性。目前Kafka Stream支持三种时间
- 事件发生时间。事件发生的时间,包含在数据记录中。发生时间由Producer在构造ProducerRecord时指定。并且需要Broker或者Topic将
message.timestamp.type
设置为CreateTime
(默认值)才能生效。
- 消息接收时间,也即消息存入Broker的时间。当Broker或Topic将
message.timestamp.type
设置为LogAppendTime
时生效。此时Broker会在接收到消息后,存入磁盘前,将其timestamp
属性值设置为当前机器时间。一般消息接收时间比较接近于事件发生时间,部分场景下可代替事件发生时间。
- 消息处理时间,也即Kafka Stream处理消息时的时间。
注:Kafka Stream允许通过实现org.apache.kafka.streams.processor.TimestampExtractor
接口自定义记录时间。
3.2 窗口
前文提到,流式数据是在时间上无界的数据。而聚合操作只能作用在特定的数据集,也即有界的数据集上。因此需要通过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种非常常用的设定计算边界的方式。不同的流式处理系统支持的窗口类似,但不尽相同。
Kafka Stream支持的窗口如下。
Hopping Time Window
该窗口定义如下图所示。它有两个属性,一个是Window size,一个是Advance interval。Window size指定了窗口的大小,也即每次计算的数据集的大小。而Advance interval定义输出的时间间隔。一个典型的应用场景是,每隔5秒钟输出一次过去1个小时内网站的PV或者UV。
Tumbling Time Window
该窗口定义如下图所示。可以认为它是Hopping Time Window的一种特例,也即Window size和Advance interval相等。它的特点是各个Window之间完全不相交。
Sliding Window
该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为5秒,则参与Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,可以进行Join计算。
Session Window
该窗口用于对Key做Group后的聚合操作中。它需要对Key做分组,然后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,希望通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登录操作时,该用户(Key)的窗口即开始,当发生退出操作或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。
3.3 Join
Kafka Stream由于包含KStream和Ktable两种数据集,因此提供如下Join计算
KTable Join KTable
结果仍为KTable。任意一边有更新,结果KTable都会更新。
KStream Join KStream
结果为KStream。必须带窗口操作,否则会造成Join操作一直不结束。
KStream Join KTable / GlobalKTable
结果为KStream。只有当KStream中有新数据时,才会触发Join计算并输出结果。KStream无新数据时,KTable的更新并不会触发Join计算,也不会输出数据。并且该更新只对下次Join生效。一个典型的使用场景是,KStream中的订单信息与KTable中的用户信息做关联计算。
对于Join操作,如果要得到正确的计算结果,需要保证参与Join的KTable或KStream中Key相同的数据被分配到同一个Task。具体方法是
- 参与Join的KTable或KStream的Key类型相同(实际上,业务含意也应该相同)
- 参与Join的KTable或KStream对应的Topic的Partition数相同
- Partitioner策略的最终结果等效(实现不需要完全一样,只要效果一样即可),也即Key相同的情况下,被分配到ID相同的Partition内
如果上述条件不满足,可通过调用如下方法使得它满足上述条件。
KStream<K, V> through